Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.

3570 revised changesets #3483

Closed
wants to merge 2 commits into from
Closed

Conversation

bmbouter
Copy link
Member

@bmbouter bmbouter commented May 1, 2018

https://pulp.plan.io/issues/3570

Builds on pr: #3464. Those diffs appear here as well but not part of this pr.

@pep8speaks
Copy link

pep8speaks commented May 1, 2018

Hello @bmbouter! Thanks for updating the PR.

Cheers ! There are no PEP8 issues in this Pull Request. 🍻

Comment last updated on May 31, 2018 at 22:28 Hours UTC

This replaces ChangeSets as the interface.
)
[i for i in changeset.apply()]

def _mirror(self, new_version):
Copy link
Contributor

@jortel jortel May 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ChangeSet applies the additions, then removals.

Suggest:

def _mirror(self):
    retained = set()

    def additions():
        for content in self.pending_content:
            key = content.model.natural_key()
            retained.add(key)
            yield content
    
    def removals():
        base_version = RepositoryVersion.latest(self.repository)
        for content in base_version.content:
            key = content.cast().natural_key()
            if key not in retained:
                yield content

    changeset = ChangeSet(
      self.remote,
      self.new_version,
      additions=additions(),
      removals=removals()
    )
    
    [i for i in changeset.apply()]


class PendingVersion:

def __init__(self, pending_content, repository, remote, sync_mode='mirror'):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plugin should interpret the sync_policy name for 2 reasons:

  1. This object will only be able to deal with either mirror or not-mirror (additive).
  2. The plugin can accept any policy name in addition to 'mirror' and 'additive'.

Recommend: (bool) mirror instead of (str) sync_policy.

self.remote = remote
self.sync_mode = sync_mode

def apply(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think create() is more appropriate than apply(). I also think it would be appropriate for this method to return the newly created RepositoryVersion to the plugin.

@jortel
Copy link
Contributor

jortel commented May 7, 2018

A more general observation is that the goal was to provide the differencing logic (here) for the plugin writer. I'm concerned that, for additions, the PendingVersion seems to have dropped the differencing logic all together. The ChangeSet is designed to do deal with this but it's a slight misuse (of the ChangeSet) may result in:

  • unnecessary DB lock contention and updates.
  • progress reporting that is confusing to the user

Please consider implementing the differencing logic for additions. This should involve simple logic in the additions generator to omit pending content by natural key based on a set of keys already in the repository.

@bmbouter
Copy link
Member Author

bmbouter commented May 7, 2018

I see what you are saying about the differencing code. With bringing back differencing code required to make this correct, we should check in on the approach overall.

I see the goal as slightly different from what you've written. I'll restate my understanding. The goal is to make plugin writer's lives easier by creating the PendingVersion with the interface described here.

In order to do that, PendingVersion can be implemented in several ways:

  • Have PendingVersion use the Changeset "as is" (option 1)
  • Merge PendingVersion and Changeset into one thing (option 2)
  • Adjust Changeset to fulfill the PendingVersion use case better (option 3)

We're using option 1 currently. The reasoning is that it minimizes effort since the Changeset is providing the feature that determines if it's present in Pulp and present in the repo version. If I also do that in PendingVersion, then PendingVersion is doing effectively the same thing, which is a redundancy. Also to make this redundancy, it will take extra work which makes it not the low-effort option any more.

From the coding I've done so far, I believe option 2 would result in significantly less code to maintain and is also now the lowest effort option too. It also architecturally eliminates any redundancy since they are one thing. Option 3 is also viable, but we would need a great reason to do that over Option 2, which I don't see currently.

@jortel what do you think we should do?

@jortel
Copy link
Contributor

jortel commented May 7, 2018

The ChangeSet is not determining what is (or should be) in the repository. That is the job of plugin or the PendingVersion so nothing redundant. The ChangeSet is responsible only for affecting the requested set of changes (add/remove) to the repository version.

This includes:

  • add or remove RepositoryContent as requested.
  • detecting whether Content exists in pulp; de-duplicating or create as needed.
  • detecting whether Artifact exists in pulp; de-duplicating or download/create as needed.
  • add/updating/remove RemoteArtifact as needed.
  • progress reporting on ^.

(Option 1), having the PendingVersion use the ChangeSet as-is supports a clean/clear separation of concerns without any duplication. I don't see any changes needed in the ChangeSet to support PendingVersion. Both the ChangeSet and the PendingVersion will be more useful and maintainable if layered and used in collaboration instead of combined. Also, no decision has been made as to whether or not the ChangeSet will remain in the plugin API.

(Option 2 & 3) would be significantly more effort and not result in any code de-duplication. We should stick with (option 1) as planned.

@jortel
Copy link
Contributor

jortel commented May 8, 2018

I'd anticipated something simple like this:

from .changeset import ChangeSet
from .models import RepositoryVersion


class PendingVersion:

    def __init__(self, repository, remote, content, mirror):
        self.remote = remote
        self.repository = repository
        self.content = content
        self.mirror = mirror

    def _additions(self, added):
        base = RepositoryVersion.latest(self.repository)
        associated = [c.cast().natural_key() for c in base]
        for content in self.content:
            key = content.model.natural_key()
            added.add(key)
            if key not in associated:
                yield content

    def _removals(self, added):
        if not self.mirror:
            return ()
        base = RepositoryVersion.latest(self.repository)
        for content in base.content:
            key = content.cast().natural_key()
            if key not in added:
                yield content

    def create(self):
        added = set()
        with RepositoryVersion.create(self.repository) as new_version:
            changeset = ChangeSet(
                remote=self.remote,
                repository_version=new_version,
                additions=self._additions(added),
                removals=self._removals(added))
            [_ for _ in changeset.apply()]
            return new_version

@bmbouter
Copy link
Member Author

bmbouter commented May 9, 2018

After thinking about this some more, I think option 1 (keeping PendingVersion and Changeset as separate objects) leads to a less efficient db performance, more code, and higher memory usage.

Here is an example. Assume that we have the unit keys in memory that we want to remove like in my PR here. The easiest thing to do would be to find-and-delete those using the db directly with code just like these few lines here. In implementing that delete in PendingVersion it would be implemented as one, single query instead of a query per content unit in the removals iterator, so it would be more efficient in how it interacts with the database too. The "stream-based" removals iterator isn't a good fit for PendingVersion's needs because it already has the entire list in memory so operating on it element-by-element with the db is inefficient. Additionally, by having to call into the Changeset to get access to those lines, we have to add more code (more maintenance) which builds additional objects in memory which is less memory efficient.

You can also see the extra work and complexity option 1 brings in various ways. One is with the differencing code that would still need to be added. Also look at this additional closure here. If the main loop draining the iterator was in PendingVersion, we wouldn't need this additional closure. To me, this suggests our design is too complex.

This gets at something that you asked also: which is if Changeset is going to be offered in the plugin API or not. I think to include it, we would need a use case that the PendingVersion doesn't solve. I don't know of one, and logically speaking all plugins should be able to express their stream of PendingContent objects to PendingVersion just the same as Changeset. Can we think of a use case where the declarative interface won't meet the needs? If we can, then that would be a great reason to include it in the plugin API.

I think we need to get input and perspectives and reach an agreement before more coding is done. Discussing on the issue has been great. Also this API change I think will be very beneficial to plugin writers; for example look at the code percentage drop significantly from pulp_file when using the declarative interface.

@jortel
Copy link
Contributor

jortel commented May 9, 2018

I don't think your assertions that (option 1) "leads to a less efficient db performance, more code, and higher memory usage" are substantiated.

DB performance

The example of DB performance re: removals is not correct. The ChangeSet uses transactions to delete RepositoryContent and RemoteArtifact records in batches of 1000. Within a transaction, each delete has zero DB overhead until the entire transaction is committed. The difference between using the transactions and a single DELETE statement with 1000 natural keys is negligible. The ChangeSet doesn't do anything in the DB item-by-item.

More Code

I just don't see it. The differencing logic needs to be added somewhere. Using the ChangeSet is literally 2 lines of code. This example #3483 (comment) show just how little code needs to be in the PendingVersion and just how simple and maintainable it would be.

Higher memory usage

Given the PendingVersion requires the plugin writer to always instantiate all of the content & artifact objects (which for many repositories will be in the tens of thousands), fetching and keeping a Content object in memory (to be removed) 1000 (at a time) hardly seems relevant. Although, my code example here: #3483 (comment) needs to be modified to limit the fields in the fetch Content object.

Next

Agreed. Let's get more people involved in the discussion.

@bmbouter
Copy link
Member Author

I want to look at the design more to talk through these concerns. To simplify things, let's just look at the DB concerns. I wanted to be sure my understanding of how the 1000 DELETE example loads the database so I asked in #postgresql on Freenode. They confirmed a few things:

  1. Every DELETE statement will load the DB when its executed because it is actually changing the data right then, not when the transaction is committed. All the transaction is doing is hiding that change from other transactions that are also open and checking for write-write conflicts. They gave me a list of links to read that talk more about these details (at bottom). So this means the DB performance concern is accurate and that wrapping it in a transaction isn't resolving the performance issue.

  2. One DELETE statement with 1000 where conditions is more efficient than 1000 individual DELETE statements in terms of the raw DB execution time. We don't know how important this is, but if one way is more efficient we should do that.

  3. This design goes over the network twice for each unit deleted and runs Python code in between the DELETE statements, all of which is slow. Each DELETE is sent to the db (maybe via network), executed by the db, result returned (maybe via network), and then more Python loop code executes before the next DELETE statement is issued. Going over the network twice for each unit deleted, and executed Python code in between is another difference I'm pointing out here. A single DELETE statement with 1000 WHERE conditions would go over the network once for the entire 1000.

I was looking at the Changeset code. I think the Changeset perform removals with the db item-by-item here? It builds the queryset directly and then executes it for that single content unit. The transaction wrapping doesn't make it not item by item.

I see a lot of ^ as a design mismatch. The Changeset removals are expecting a stream, but the PendingVersion already has all of the removal keys in memory. Having it make a bulk db call directly in PendingVersion I think would be efficient and have minimal code.

One concern you mentioned and I recognize is the separation of concerns. What if we have fewer classes and have shorter, well-documented methods on a single class? This would still keep the separation of concerns at the method level on PendingVersion and integrate the Changeset code into it in a way that is organized and with concerns separated.

What do you think we should do about these ideas? What are the problems with them? What can we do that is better?

Also what about the key question of if we are offering Changeset as part of the plugin API. That would inform this implementation hugely. Assuming PendingVersion provides its declarative additive and mirror use cases, are there plugin writer use cases above and beyond that? Is the Changeset feature set the best way to address those use cases? Do you have some thoughts on these use case aspects?

http://www.postgresql.org/docs/current/static/mvcc.html
http://en.wikipedia.org/wiki/Multiversion_concurrency_control
http://devcenter.heroku.com/articles/postgresql-concurrency
http://momjian.us/main/presentations/internals.html#mvcc
https://brandur.org/postgres-atomicity

@bmbouter
Copy link
Member Author

I don't want to overrun with comments, but I just thought of something that was significant enough to write out. Since the Changeset was originally built a lot has changed in pulpcore. These changes affect modify of the Changeset feature set because now some of the things Changeset was providing are resolved at the architectural level. The main changes I'm talking about are:

  1. Repository versions
  2. A crash-safety cleanup mechanism of RepositoryVersions using the complete=False flag
  3. Orphan cleanup that cleans up both orphaned Artifacts and Content Units

One point of value I always saw in the Changeset is that its transactions roll back in the event of a crash which keeps several aspects of consistency. Specifically it avoids:

  1. Orphaned Artifacts that are orphaned due to the crash occurring before the artifact is associated with a content unit
  2. Orphaned Content units that are orphaned before the created content unit is associated with a given repo
  3. inconsistent repository due to some changes being applied and a crash occuring before all of them are applied.

With the core changes mentioned, the above situations aren't issues for plugin writers anymore. Now with RepositoryVersion and complete=False if a crash occurs the RepositoryVersion will be deleted so the third issue isn't a problem anymore. For issues (1) and (2) when the crashed task is rerun, those existing Artifacts and Content Units will be recognized as present and not re-downloaded or re-created so that is efficient. If they are never needed again the orphan cleanup will correctly remove them so that is good too.

@jortel
Copy link
Contributor

jortel commented May 10, 2018

I'm convinced the single delete statement matching a 1000 records is more efficient. Mainly it reduces the statement parsing and round-trip to the DB. I'm not sure the difference is significant but I agree we should always try be as efficient as possible. That said, the ChangeSet can very easily be changed to delete (removals) using a single delete statement matching multiple records. So, the delete DB efficiency and item-by-item points don't seem relevant to the design decision.

I don't see the PendingVersion using the ChangeSet to do the removals as a design mismatch re: streams processing. Although, the PendingVersion has all the natural keys, it still need to select which keys should be removed. Passing the ChangeSet a generator is less code in the PendingVersion and is consistent with streams processing. Also, generators are very efficient.

Grouping the methods within a single class by separation of concerns is a yellow flag and confirms they should be 2 separate classes. It's synonymous to suggesting that one big function containing separated code blocks is somehow better than having the same functionality separated into multiple functions each doing on thing (having a single concern).

@jortel
Copy link
Contributor

jortel commented May 10, 2018

The ChangeSet intentionally groups DB updates (as you described) into transactions for the purpose of preventing anomalies. But, the transactions are not the primary value proposition of the ChangeSet. I listed them here.

@bmbouter
Copy link
Member Author

bmbouter commented May 10, 2018

I don't think the analogy that multiple methods on a single class is similar to multiple code blocks in a single function holds up in at least two ways. A single function requires you to run all code blocks at once while small methods can be run in a fine-grained way. Also look at testing differences. Each method can be clearly and easily tested while a large monolithic function with code blocks cannot. I'm pointing this out because monolithic methods is a well-known bad practice. I think having one object with various, simple methods would allow each method to have a separation of concerns.

What I'm advocating is to move some of the methods from Changeset to PendingVersion and removing the rest of Changeset. I don't know of use cases that would motivate offering Changeset in the plugin API. This would result in significantly less code which I think would be a really good thing. I also think fewer layers would make the entire thing easier to debug.

@jortel
Copy link
Contributor

jortel commented May 11, 2018

The function/code-blocks analogy was meant only as a separation-of-concerns analogy and not making a modularity argument. Just as functions should do one thing and delegate other functionality to other functions, classes (objects) should be one thing and delegate other behavior to other objects.

Ah, so you are only talking about merging the ChangeSet and PendingVersion classes and not re-implementing the entire changeset package in the PendingVersion class. That would be less concerning since the ChangeSet delegates well to other objects. Though, I still think it's better to leave them separated.

@jortel
Copy link
Contributor

jortel commented May 11, 2018

Let me suggest a path forward. The main goal is to provide the PendingVersion abstraction to plugin writers. Since we seem to agree on its value (for at least most plugins) and its interface, I propose an incremental approach that requires the least amount of code change. Lets add the PendingVersion based on this simple 40 lines of code. This gets the PendingVersion out into the wild and into the hands of plugin writers quickly so we can start getting feedback about the long term usefulness of both the PendingVersion and the ChangeSet. Giving the community something tangible to critique, we can use this information to revisit decisions about merging the two classes and removing the ChangeSet from the plugin API.

How does this sound?
Can you suggest the type of information we solicit from the community to help us with these follow up decisions?

@bmbouter
Copy link
Member Author

I should be clearer about the scope. I think we should look to simplify the code of the whole changeset package with this transition to the declarative interface. Consider the 'report' package; if Changeset isn't in the plugin API then we don't need formal reporting outside of the Progress Reporting which is recorded on the task. We can remove report entirely. Also the iterator package could be simplified a good amount too. We can also remove a lot of the boilerplate getter/setter/property code. We would keep PendingContent, PendingArtifact, and PendingVersion for sure.

Overall I think improving the efficiency while reducing maintenance is a goal worth the effort. it would be higher performing for all the reasons discussed on the PR so far since it would be custom built to implement PendingVersion's feature set needs as efficiently as possible. Having less to maintain would also be a big win. It would also be easier to debug which is another beneficial outcome of this type of change. There are so many layers in the call stack that I find it hard to debug easily. This would collapse that several layers.

If an experimental branch was approved to be made, we would better know exactly what the changes are, quantify how more efficient it is, and show how much code we would be saving for the same feature set.

How do we solicit input on going forward with something like this? What are the problems with this approach?

@bmbouter
Copy link
Member Author

@jortel I agree the most important user-facing change is PendingVersion and we can make incremental changes to address all issues after we get that API in place. Can we check in on a few things:

I plan to have removals handled by one mass delete inside of PendingVersion itself. That is the exact point that got this PR discussion started actually. That would be the most efficient thing to do. Will that work for you?

Also this only leaves PendingContent, PendingVersion, and PendingArtifact in the plugin API and it removes Changeset and all other objects from the API. They are still internally left as-is in the code. Will this work for you?

Finally, having those things live in pulpcore.plugin.changeset is a little strange, I think they could come from a package named puolpcore.plugin.pending instead. What do you think about this?

@jortel
Copy link
Contributor

jortel commented May 11, 2018

Here you stated "I see the goal as slightly different from what you've written. I'll restate my understanding. The goal is to make plugin writer's lives easier by creating the PendingVersion with the interface described here."

Has this changed?


When discussing the single statement vs. multiple statements on #postgresql, I asked them to quantify the difference in expected performance and efficiency between the two methods. They recommended that we time both approaches, so I did. I tested removing 5000 content units from a repository which results in deleting 5000 RemoteContent and updating 5000 RepositoryContent. My expectation , as noted in this comment, was that the difference would be insignificant.

Measurements:

Multiple (10k) statements within a transaction averaged: 25 seconds.
Two (single) statements: 1 delete (matching: 5000) and 1 update (matching :5000) averaged: 30 seconds.

I found these results curious in several ways:

  1. The same changes in the DB using psql takes < 1 second. Why so long? We need to investigate; check indexes etc.
  2. The approach used by ChangeSet to affect removals is faster than the proposed optimization.

Implementing the removals in the PendingVersion seems both premature and beyond the scope of a minimal and incremental approach.


Any changes to the changeset package (except merging #3464) or the plugin API other than adding the PendingVersion seems beyond the scope of a minimal and incremental approach.

@dralley
Copy link
Contributor

dralley commented May 13, 2018

I'd just like to point out that if we still hope to regain support for sqlite in the future (which seems increasingly unlikely at this point), we can't assume that we can make very large mass creations or deletions due to the expression tree size limit.

https://pulp.plan.io/issues/3499

Sqlite would limit us to working with 600-800 items at a time.

With respect to

the same changes in the DB using psql takes < 1 second. Why so long? We need to investigate; check indexes etc.

It could be that Django ORM is emitting a really unoptimized query vs. the one that you're producing. Perhaps we should compare?

https://stackoverflow.com/questions/1074212/how-can-i-see-the-raw-sql-queries-django-is-running

@bmbouter
Copy link
Member Author

I want to go back to the high level with the use cases. I think understanding what the use cases we are solving and understanding how we are fulfilling these use cases is the best way for us to navigate how we serve our users. From that perspective I don't think we're committed to keeping any of the code we have now due to sunk cost reasoning. If what we have is the best way to fulfil the use cases then great, if not then we need to make an adjustment.

Here are the use cases I see the plugin writers needing from us:

  1. As a plugin writer, I can add a batch of content to a RepositoryVersion when expressed as PendingContent and associated PendingArtifact objects. While handling these, core will ensure:
  • PendingArtifact objects are auto-downloaded and saved as Artifacts
  • PendingContentobjects are auto-determined if they are already in Pulp and if they are identical
  1. As a plugin writer, I can perform the adding described ^ using stream processing. This also adds progress reporting.

  2. As a plugin writer, I can have core automatically remove any content units that didn't correspond with a PendingContent object emitted. This is effectively a mirroring mode. This would come with progress reporting.

I think these are the 3 use cases that plugin writers need in this area. Are there more requirements? Do people agree or disagree with aspects of these?

In terms of approach, after the use cases are known, we'll have a solid foundation to move into a discussion about implementation options.

@jortel
Copy link
Contributor

jortel commented May 16, 2018

The organization into 3 use cases (some having details: bullets) does not resonate well with me. Rather it seems more like 2 use case with details.

Terms:
A content-set is a Collection or Iterable of PendingContent. Each PendingContent object is fully constructed including related PendingArtifact. Reminder: a generator IsA Iterable.


As a plugin writer, I want the plugin API to provide an object that can be used to create a new RepositoryVersion containing exactly the content specified by the content-set.

  • Add RepositoryContent as needed to match the content-set.
  • Remove RepositoryContent as needed to match the content-set.
  • Detect whether Content exists in pulp; de-duplicate or create as needed.
  • Atomic creation of Content, ContentArtifact and RemoteContent to protect against storing incomplete Content.
  • Detect whether an Artifact exists in pulp; de-duplicate or download/create as needed.
  • Add/Updating/Remove RemoteArtifact as needed to match the content-set .
  • Report progress on content added.
  • Report progress on content removed.
  • Report progress on artifacts downloaded.
  • The new RepositoryVersion is complete=True on success and discarded on error.
  • Use streams processing to constrain memory footprint.
  • Batch DB queries for efficiency and support using broad range of databases.

As a plugin writer, I want the plugin API to provide an object that can be used to create a new RepositoryVersion containing at-least the content specified in the content-set.

  • Add RepositoryContent as needed to match the content-set.
  • Detect whether Content exists in pulp; de-duplicate or create as needed.
  • Atomic creation of Content, ContentArtifact and RemoteContent to protect against storing incomplete Content.
  • Detect whether an Artifact exists in pulp; de-duplicate or download/create as needed.
  • Add/Updating RemoteArtifact as needed to match the content-set .
  • Report progress on content added.
  • Report progress on artifacts downloaded.
  • The new RepositoryVersion is complete=True on success and discarded on error.
  • Use streams processing to constrain memory footprint.
  • Batch DB queries for efficiency and support using broad range of databases.

@jortel
Copy link
Contributor

jortel commented May 18, 2018

@gmbnomis, one option would be for your content-set generator to yield a subclass of `PendingContent and override save() .

Something like:

class PendingCookbook(PendingContent):

  def save(self):
    # REST call; enrich the metadata
    super().save()

Thoughts?

@bmbouter
Copy link
Member Author

@gmbnomis and @jortel I've also been thinking about that same use case. It's where the plugin writer already knows the items they want to add or remove, but they don't know if Pulp already has them downloaded and created. This is similar to what PendingVersion will do, but you don't have to go through all the items.

We should consider integrating this functionality with RepositoryVersion since that is what your PendingContent objects will be added and removed from. It seems related to the RepositoryVersion.add_content and RepositoryVersion.remove_content interface. We could have it accept an iterable of PendingContent units which would provide access to the Changeset features except using an interface the plugin writer already knows about instead of learning a new thing.

I think having fewer objects that the users need to learn, and having this functionality be available on RepositoryVersion would be good. One thing to point out is that the Changeset was written before RepositoryVersion so we couldn't have done this originally.

That interface would effectively provide "batching" so everytime you call it that group is treated as a batch. Then the PendingVersion implementation would use it also.

@gmbnomis
Copy link

@jortel: Doing this at that point in time feels a little bit odd. But there is also a real drawback when doing REST calls in .save(): If I get this correctly, we can't do REST calls concurrently here.

For example, in an initial sync (i.e. Pulp repo is empty), there will be many PendingContent units to add. If the REST calls for metadata happen in the synchronous .save(), they must happen synchronously as well. If the plugin computes a set of content units to add, the REST calls can be done concurrently for this set.

One could go one step further and do the metadata enrichment asynchronously as part of the PendingVersion streaming.
One could have e.g. a async def finalize_add(self) method in PendingContent. Each time a PendingContent unit is ready because its artifacts have been downloaded, its metadata could be finalized running in the same event loop as the downloads (this does not necessarily mean HTTP requests, some plugins might want to look inside the downloaded artifacts to obtain additional metadata)

But I think computing a set of content to add and then just fetch additional metadata for the set is much simpler.

@jortel
Copy link
Contributor

jortel commented May 18, 2018

@jortel: Doing this at that point in time feels a little bit odd. But there is also a real drawback when doing REST calls in .save(): If I get this correctly, we can't do REST calls concurrently here.

Good point.

We'll keep brainstorming but looks like ChangeSet may be a better fit for you.

@bmbouter
Copy link
Member Author

@gmbnomis I want to go back to your use case. I see the similar "two-layers of metadata pattern" in pulp_ansible and pulp_maven.

The solution you outlined would work, but I want to look at a failure scenario. The outlined solution has two phases. Phase 1 uses the declarative interface to determine what to add, and Phase 2 adds extra metadata for created content units. What about this failure scenario though:

  1. A failure occurs during Phase 2 when you are processing the Changeset's added content units with the additional metdata.
  2. A second sync occurs and its Phase 1 looks for content units again from universe and believes (incorrectly) that those units are present and fully populated
  3. Other unrelated repos also containing that unit will also be randomly missing as well.

Code could be added to handle these failures, but if we handled that extra data fetching prior to saving and as part of the stream it would be a safer architecture for this type of processing. I believe having a capability to update a "content unit that is in memory but not saved" step in the stream processing would be broadly useful for our stream processing. It could also inspect the locally downloaded data if that is what it needed to do to discover the data it needs to write.

To accomplish this extra "step" we could allow a plugin writer to specify a generator that yields updated content units and receives the content units created by PendingVersion. The updated content units from this generator's stream would be saved and associated. If unspecified by the plugin writer, we would use the following default implementation:

def NoChangeGenerator(content_unit_stream):
    for content_unit in content_unit_stream:
        yield content_unit

What do others think about this as a way to resolve the two-layer metadata problem at the architecture itself (with an optional step)?

@dkliban
Copy link
Member

dkliban commented May 21, 2018

@bmbouter I like this idea, but I want to take it a step further. The solution you proposed is good for a 2 stage unit creation process. What happens when the plugin writer needs a 3 stage process? What about an n stage process? Supporting a pipeline of work would be useful.

@jortel
Copy link
Contributor

jortel commented May 22, 2018

@bmbouter wrote:

What do others think about this as a way to resolve the two-layer metadata problem at the architecture itself (with an optional step)?

The goal is to provide the plugin writer with an opportunity supplement the metadata for only the PendingContent selected to be added by the PendingVersion. Seems like a simpler approach would be for the plugin writer to override PendingVersion._additions() described here; iterate the pending content yielded by super()._additions(), add the supermarket metadata and yield the supplemented PendingContent. Wouldn't this be sufficient?

@bmbouter
Copy link
Member Author

The issue with this approach is that it limits concurrency. We can observe the problem with the Chef example. When PendingVersion._additions() is subclassed, inside that function, code will create a new downloader to download the Supermarket API data for that unit. That Downloader will need to be scheduled onto the asyncio loop directly in PendingVersion._additions(). That download will finish, the PendingContent unit would be updated, and passed along down the stream. Then it goes to download another one from the incoming stream of PendingContent units.

Here are the issues this creates. It's two halves of the same coin:

While blocking on the download occurring in PendingVersion._additions() Pulp isn't productively handling other Python code. For example the rest of the stream isn't running database queries, it isn't creating model data, etc.

In addition to the Python code being blocked, other downloads are also not occurring, specifically the batch artifact downloaders. When the batch artifact downloaders are running, the downloads being on in PendingVersion._additions().

What can we do about this problem?
Are there other issues we aren't considering?

@jortel
Copy link
Contributor

jortel commented May 22, 2018

@bmbouter, I'd anticipated the plugin code running in the _additions() override would schedule supermarket downloads using the asyncio loop so both the supermarket downloads and artifact downloads would all be done concurrently.

@gmbnomis
Copy link

Basically. all these approaches are 'pipelining' generators in one way or another (the output of one generator is the input to the next).

I am wondering, if that goes well with using async. If one generator A uses run_until_complete to do things concurrently (download artifacts, do REST API request for meta-data, ...) and yields (like here and the next in the pipeline (generator B) does the same, then we have two places where the event loop runs. When A yields, B may decide to work on a batch and A's run_until_complete loop does not run anymore for some time. Vice versa, if B requests the next item from A, A may decide that it is time for a new batch and will run its run_until_complete loop, possibly interrupting B's loop.

I don't know what happens exactly in this case. Usually A and B will use the same event loop. Already scheduled futures may even continue to run in the other generator's run_until_complete loop. However, these futures won't be picked up until the run_until_complete loop switches. (It might even get worse: If B decides to do requests synchronously, A's requests/downloads may time out in the meantime)

I think we need more than pipelined iterators. Ideas that come to mind:

  1. Couple the stages asynchronously (using async functions/iterators or even generators). Everything in a sync runs in one big event loop.
  2. Use a "batching" protocol: Stages communicate with batches of objects, not single object. Every stage works on the entire batch before it yields the batch to the next stage.

@gmbnomis
Copy link

@bmbouter: Just saw your comment. I think we both describe the same problem here.

@jortel
Copy link
Contributor

jortel commented May 22, 2018

@bmbouter, @gmbnomis: Okay, I think I see the problem you're both describing. Thinking on this more.

@jortel
Copy link
Contributor

jortel commented May 22, 2018

Thought of another approach. The PendingContent could support the concept of attaching metadata downloaders. The ChangeSet could incorporate them into the normal stream of futures executed in its asyncio loop. The metadata downloader just needs to be coroutine (or future) that would update the associated Content on completion. The PendingContent would considered itself to be settled when: 1) all artifacts have been settled (downloaded); 2) all the metadata downloads have completed. The download could complete in any order all using one loop.

I think, this would be very straight forward to add.

Thoughts?

@gmbnomis
Copy link

@jortel: I think this is similar to the async def finalize_add(self) method I proposed above. Your proposal works well for me.

To address @bmbouter's concern about error handling: IMHO, the sync logic should not add a content unit to the add set if its metadata download raises/returns an error (and it must not be removed from the new repo version)

Re naming: I would not call them metadata downloaders, as they may also get metadata from the downloaded artifact. I am bad at naming; no alternative proposal from my side 😀

@jortel
Copy link
Contributor

jortel commented May 22, 2018

To address @bmbouter's concern about error handling: IMHO, the sync logic should not add a content unit to the add set if its metadata download raises/returns an error (and it must not be removed from the new repo version)

Agreed. The ChangeSet will not add/create content that has encountered an error. Instead, it is reported on the Task and the yielded ChangeReport.

Re naming: I would not call them metadata downloaders, as they may also get metadata from the downloaded artifact. I am bad at naming; no alternative proposal from my side grinning

Fair enough about the naming. I leaned toward downloader but we could go with something future related. The main thing is that each be a Future (or coroutine).

@dkliban
Copy link
Member

dkliban commented May 23, 2018

@jortel @gmbnomis What about the ability to define a pipeline of work for creation of a content unit?

A solution that provides a way for a plugin writer to express the unit creation process as a pipeline of asynchronous work would be great.

@bmbouter
Copy link
Member Author

@jortel, That would workaround the problem for this specific use case, but my concern is that the concurrency model of Changeset's are incompatible with the concurrency of the Downloaders and asyncio more broadly. More content types will come, with more use cases that involve Downloaders and asyncio based concurrency in various ways during stream processing. This convo has been super useful because I think we've uncovered an even larger problem than I realized. I believe we need to explore and possibly prototype an architectural solution.

Also consider that if we don't fix this plugin writer's can't use anything else asyncio based in the pipeline either (not just downloaders). For instance this would block us using asyncpg while benefiting from it's concurrency (which is the reason to use it). Also any of these things too. That's a lot of things to not be able to use during our stream processing.

We could get out of this situation if we adopted the option A from @gmbnomis comment. In practice I think this will be easy as long as the components have well-defined types going in-out. Here is a SO post w/ some example code just like this. There are also libraries for this kind of thing I believe.

This is not unlike what @dkliban was alluding to. The Plugin API can provide the asyncio-compatible raw components of the pipeline and also some pre-built pipelines from those components. If plugin writers only needs the functionality of PendingVersion there would be a pre-built pipeline they could use. In the case of anyone wanting to do something custom (e.g. this use case from @gmbnomis ) they would inject their steps any point in the stream without any involvement from us. This would make the plugin API both stream processing, fully concurrent, and asyncio compatible.

@gmbnomis
Copy link

@bmbouter: I think I see your point.

I thought we could support stages with the extension point I discussed with @jortel:

async def metadata_future(self, content):
   await self.stage1(content)
   await self.stage2(content)
  ...

Pulp core could run e.g. 10 of these and the artifact downloads concurrently. the 'stages' would run sequentially for each content unit.

But the latter may already the problem: What if it is more efficient for the plugin to request meta data for more than one content unit at a time (i.e. it wants to include multiple content units in a request to the remote)? Or in the extreme case, the plugin might need the entire "add set" before progressing.

Also I forgot about the iterator that feeds PendingVersion, which has the same problem as the stages after PendingVersion.

OTOH I am wondering whether we ask too much from plugin writers if Pulp plugin API offers a very powerful but hard to grasp bleeding edge asynchronously coupled pipelining framework...

@dkliban
Copy link
Member

dkliban commented May 24, 2018

@gmbnomis I agree that it's important to strike a balance between performance and easy of use. I am confident that we can achieve both in this case.

A lot of developers may not be familiar with asyncio right now, but that will change with time.

@bmbouter
Copy link
Member Author

@gmbnomis and @dkliban I also believe we can deliver the power of asyncio based concurrent streaming without the plugin writer having to understand how it works underneath or even asyncio. PendingVersion could feel the same to them as it does today, but internally it would be built using a single asyncio loop and be entirely coroutine based. The PendingVersion.apply() method could handle the asyncio loop for them, similar to the BaseDownloader.fetch() method.

An alternative to hiding the event loop would be to have the plugin writer schedule coroutine PendingVersion onto the event loop themselves which I think is something even begins could do. It would be:

pv = PendingVersion(self, pending_content=self._pending_content, sync_mode='mirror')
asyncio.get_event_loop().run_until_complete(pv)

I find hiding the event loop a bit magical, so I prefer not hiding it (explicit>implcit), but I'm ok with either. What is more preferable between these two choices?

In terms of use cases, this would allow plugin writers could rebuild the pipeline easily as their content type needs with the same steps used by PendingVersion. That is the significant difference from what we have today. Today the stream steps (a) aren't reusable, (b) not reordable, (c) unable to add in arbitrary steps, and (d) unable to remove steps. @gmbnomis think it's (c) which would capture your use case and any future use cases in this area.

@jortel what do you think about these things?

I'm going to post some use cases here that reflect these requirements recomposability.

All responses and ideas are welcome.

@gmbnomis
Copy link

gmbnomis commented May 27, 2018

FWIW, I looked into the pipelining options a bit and cobbled together this script. The script's intention is to explore how async stages could be implemented; I did not bother to think about good abstractions and did not care about errors for most parts.

It requires Python 3.6 (async generators are supported beginning with Python 3.6. Alternatively, I could have used async iterators which are supported in 3.5 already, but they are not as elegant).

The basic idea is that we have a simple async function do_work() that does the work for a stage (for sake of simplicity, all stages use the same do_work function in the script).

The stages implement different methods to schedule the work (streaming sequentially, streaming concurrently, gather all content units first).

In the simplest case, the plugin writer would implement the worker function (or most probably class) and plug it into a pre-defined stage scheduler. At the beginning of the pipeline, it would be a function/generator that generates units. In more complex cases, the plugin writer could implement a stage.

There are two implementations of stages in the script:

  1. Using pipelined async generators
  2. Using the producer/consumer pattern with asynchronous queues

Async generators are "pull" based: By default, the upstream iterator does not run if the downstream iterator can't process input. I think this is a nice feature. Probably, async iterators are a little bit faster than the queue based implementation (units are passed directly between stages instead of put/get methods on queues).

In all other respects I like the queue based approach better:

  • producer/consumer fits well for the use case
  • I think it is easier to understand overall
  • Using non-blocking queue functions (e.g. get_nowait()) it is easy to collect all work that has accumulated upstream in one go.
  • adding (limited) buffers between stages is trivial (just set the max. queue size accordingly). (For async generators, one needs something like the backlog implementation in stage_concurrent to achieve the same.)
  • If required, it generalizes neatly to multiple streams (just use multiple queues, e.g. for error reporting)

@bmbouter: Regarding 'hiding' the event loop: I don't know how much code we need outside the event loop (e.g. for handling exceptions that bubble up, reporting and, if there is a use case for it, cancellation). If we need it, implementing it inside of PendingVersion makes sense. OTOH, having it outside gives the plugin writer more control about Tasks. I favor the 'inside' option, but have no strong opinion about it.

@jortel
Copy link
Contributor

jortel commented May 29, 2018

@jortel, That would workaround the problem for this specific use case, but my concern is that the concurrency model of Changeset's are incompatible with the concurrency of the Downloaders and asyncio more broadly.

Incompatible how?

I don't see how this is a workaround. The proposal provides a solution for this use case and other like it. It provides an opportunity for plugin writers to specify one or more asyncio futures/coroutines used to do anything concurrently like download additional metadata used to compete Content construction. What are the other needs not being met?

More content types will come, with more use cases that involve Downloaders and asyncio based concurrency in various ways during stream processing. This convo has been super useful because I think we've uncovered an even larger problem than I realized. I believe we need to explore and possibly prototype an architectural solution.

What larger problem?

Also consider that if we don't fix this plugin writer's can't use anything else asyncio based in the pipeline either (not just downloaders). For instance this would block us using asyncpg while benefiting from it's concurrency (which is the reason to use it).

The ChangeSet deals with downloaders as asyncio.Futures so already supports anything asyncio based.

Why would plugin writer's use asyncpg?
How would they use it? Directly outside of django models? With models? Can asyncpg be integrated with django?

Also any of these things too. That's a lot of things to not be able to use during our stream processing.

The proposal supports plugin writer defined asyncio futures/coroutines. How would plugin writers be unable to use aio libs?

This is not unlike what @dkliban was alluding to. The Plugin API can provide the asyncio-compatible raw components of the pipeline and also some pre-built pipelines from those components.

Background:

Streams processing is synchronous and serial; aio is asynchronous and parallel. The ChangeSet provides the following streams processing pipeline stages:

  1. Generate PendingContent stage. The additions (generator) provided by plugin.
  2. Deduplicate Content stage. (ContentIterator)
  3. Deduplicate Artifact stage (ArtifactIterator)
  4. Download (AIO) stage. (DownlaodIterator)
  5. Apply (DB changes). (apply main loop)

The AIO stage is (by design) asynchronous and concurrent. The Apply stage is executed when all of the concurrent tasks have completed. The AIO stage is where (in the proposal) plugin writers can contribute asyncio futures/coroutines and where streams processing and asynchronous processing flows are integrated.

Questions:

  1. What are the use cases being supported by offering the " asyncio-compatible raw components of the pipeline"?

  2. What are the use cases for plugin writers constructing custom pipelines that are different than stages described in 1-5 given that additional, custom aio futures/coroutines used to complete Content construction could easily be supported by stage:4 as proposed.

  3. How would your proposed pipeline stages differ?

@bmbouter
Copy link
Member Author

@gmbnomis Thanks so much for posting your script example. It looks like what I was thinking of also. Today I'm going to try to build a prototype on top of it that will provide the PendingVersion feature set only using stages as its design.

@bmbouter bmbouter closed this May 31, 2018
@dralley dralley changed the base branch from 3.0-dev to master May 31, 2018 22:28
@dralley dralley reopened this May 31, 2018
@bmbouter
Copy link
Member Author

It took me a long time to make it, but I finally finished the DeclarativeVersion code based on your prototype @gmbnomis It still needs a few things, and I'm tracking that stuff as part of Issue 3844.

A diagram of a proposed solution: https://i.imgur.com/7cEXC5e.png
pulp/pulp code changes: master...bmbouter:introducing-asyncio-stages
pulp/pulp_file code changes: pulp/pulp_file@master...bmbouter:introducing-asyncio-stages

@bmbouter
Copy link
Member Author

This work was the inspiration to DeclarativeVerison. That code is now merged and available so I'm closing this PR. Thanks for all the discussion!

@bmbouter bmbouter closed this Aug 21, 2018
@bmbouter bmbouter deleted the 3570-revised-changesets branch August 21, 2018 20:22
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
7 participants