New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session - Storage Layer #3277

Merged
merged 7 commits into from Aug 14, 2018

Conversation

Projects
None yet
2 participants
@madt1m
Copy link
Member

madt1m commented Aug 2, 2018

Introduction

Here we are, this is a crucial step in the development of the Session addon. Considering this, I would appreciate so much any critic, review and comment which can ensure this piece of work to meet the standard quality of the codebase.
This PR contains code to implement, in Session, the following features:

  • View (as ordered list of ids);
  • View (re)ordering / (re)filtering;
  • Storage layer (both in memory, for flows not flushed, and on disk);
  • Logic to capture flows -> save them to hot flows -> periodically flush flows to disk -> retrieve them;
  • Logic to detect for updated flows already in state, and to replace them opportunely in storage;
  • An initial strategy to adapt flush parameters to the incoming load (much prone to improvements);

Flow Lifecycle

I've sketched down how the logic behind the storage layer behaves.
I apologize for the DIY feel, and I'll try to enrich the "diagram" wink-wink with some comments below.

image

Main Data Structures:

  • _view: a list of tuples (flow.order, flow.id) sorted by first element. It basically keeps which flows are in the current view, ordering them.
  • hot_store: an ordered dictionary keyed on flow ids. It stores flows when they first arrive or when they are updated, and serve as the first layer of storage.
  • db_store: the class wrapping sqlite database. Flows are periodically flushed here as protobuf blobs, retrieved and loaded when some other tool is trying to access to storage.

Main Behaviour:

  • When first arriving in storage - for example via request() call, the flow is added to hot store; then, if filter matches, (flow.order, flow.id) - where order is generated based on the current set order - is inserted ordered into the view.
  • If afterwards a captured flow is detected to be already in storage, the flow is replaced both in storage (via hot store) and in view.
  • Flows in hot storage are periodically flushed to db_store, inserting new tuples in database (for new flows) or replacing existing ones (for updated flows).
  • When some tool needs access to storage (for example via load.view()), the hot store is checked first. This allows to retrieve flows even if they have not been flushed yet, and always guarantees us to retrieve the most updated version of flow. If not present there, the db store is queried.

Other features:

  • Some components of flows cannot be serialized. Due to this fact, the Session DB keeps a dictionary of "live" components of flow when they are stored, reassembling them upon retrieval.
  • As discussed previously, big flow contents are stored in a separate table. A body ledger ensures one-to-one relationship between flow and body table to be sure that only one between request and response is stored.
  • Session implements a simple, initial strategy to handle bursts of flows: basically, flush period and flush rate are periodically tweaked -- if hot_store starts to grow, flush period decreases, and flush rate increases (and the opposite way around).
  • I have developed filtering and ordering routines, since those were strongly related to flow capture and insertion. Their behaviour mimic the one seen in View addon.

What's Next

Once storage layer is ok, it will be time to implement an API which mirrors View one, to finally test for interoperability and complete the layer. This will require some work out of the addon - as debated some weeks ago - to extend command typing, handle blinker Signals subscription via API and implement some way to "plug" storage modules, to replace View with Session during testing phase (and hopefully, afterwards).

During this piece of work, I wrote a fair amount of new code, and made lots of design choices in the process. Some of them could be good, some bad, so:

  • Let me know for everything which just doesn't click in your mind;
  • Let me know if you believe that something in logic or implementation doesn't work as it is supposed to do;
  • Having full statement coverage doesn't mean that the testing process has good coverage. So, let me know if you believe that I should test for more use cases, or modify the current suite.

word to the CI!

@madt1m madt1m force-pushed the madt1m:session-flowcap branch from 211560b to e9c2b12 Aug 3, 2018

@cortesi
Copy link
Member

cortesi left a comment

This is an excellent patch, and will be a solid basis for our next steps. Let's do a round of tweaks before we merge. I'll summarise my review as follows:

  • Can you please extend the type annotations for the patch. There are a few places where a type annotation would really have clarified the intent of the code, and made things easier to read.
  • I think the way we batch writes and tune the write parameters needs to be simplified. See comments in the review.
  • We go back to disk a bit too often for my liking. In particular, I think the use of __contains__ on the database has lead to us traversing the whole DB in a number of places where we shouldn't.
  • We should reconsider the way we keep orders. At the moment, we reload all flows when the order changes. It seems to me that we can trade off a bit of memory to make sure we never have to go to disk unless the filter has changed.
self._flush_rate *= 1.1
elif len(self._hot_store) < self._flush_rate:
self._flush_period *= 1.1
self._flush_rate *= 0.9

This comment has been minimized.

@cortesi

cortesi Aug 4, 2018

Member

Can we please extract any hard-coded parameters out into constants or tunable variables. Especially in magical portions of the codebase like this bit, I'd like to avoid hard-coded values.

tof.append(self._hot_store.popitem(last=False)[1])
self.db_store.store_flows(tof)

async def _tweaker(self):

This comment has been minimized.

@cortesi

cortesi Aug 4, 2018

Member

I see what you're aiming at with tweaker, but I think we can do something simpler here. Say we exit our flush period sleep, and we discover we have > flush_rate flows. At the moment, we write flush_rate flows, and then we enter the sleep for the flush period again. We're presumably already suffering from a high flow rate, so this seems to be the wrong thing to do.

I think what we want to do instead is to write batches of flows eagerly until we've exhausted our backlog, every time we wake from the flush period sleep, making sure we yield the coroutine between each batch so other things can keep happening. The "tweaks" can be simpler too. We can keep the flush rate constant (in fact, that should probably be constant anyway), and if we're experiencing over-pressure we can just decrease the flush period.

The next simplification is that we can do away with the tweaker coroutine. We can recalculate the flush period when we wake up in the _writer - if we have more than 1 batch to write, we increase the flush period, otherwise we reset it to the initial value.

self._flush_period = 3.0
self._tweak_period = 0.5
self._flush_rate = 150
self.started = False

This comment has been minimized.

@cortesi

cortesi Aug 4, 2018

Member

Could you please add type annotations for all these attributes. It really makes the code easier to read, especially when looking at subtle code like this.

self.filter = filt
self._refilter()

def _base_add(self, f):

This comment has been minimized.

@cortesi

cortesi Aug 4, 2018

Member

This needs a better name - it's more like update_view


def _base_add(self, f):
if not any([f.id == t[1] for t in self._view]):
o = self._generate_order(f)

This comment has been minimized.

@cortesi

cortesi Aug 4, 2018

Member

Trivial point: extract the order generation, which is common to both branches of the if statement here.

elif fid in self.db_store:
ids_from_store.append(fid)
else:
flows.append(None)

This comment has been minimized.

@cortesi

cortesi Aug 4, 2018

Member

What does it mean to append None to the list of flows here?

@@ -40,6 +73,16 @@ def __del__(self):
if self.tempdir:
shutil.rmtree(self.tempdir)

def __contains__(self, fid):

This comment has been minimized.

@cortesi

cortesi Aug 4, 2018

Member

I don't like the use of __contains__ here. This is an expensive operation, which goes to disk and retrieves IDs of all rows. Being able to use if x in db encourages casual use of this operation (see comments below) that we should actually use carefully.

We need to either make this operation cheap (keep an in-memory set of all IDs on disk or at least make it only retrieve one record), or make it a function like "has_id" so the user is a bit more prepared. I think having an in-memory set of IDs should be fine if this is an important operation.

# A same flow could be at the same time in hot and db storage. We want the most updated version.
if fid in self._hot_store:
flows.append(self._hot_store[fid])
elif fid in self.db_store:

This comment has been minimized.

@cortesi

cortesi Aug 4, 2018

Member

It seems to me this check is un-necessary - the flow has to be either in the hot store or on disk? As noted above, membership checking for db_store is very expensive, so we shouldn't do it unless we have to.

def store_count(self):
ln = 0
for fid in self._hot_store.keys():
if fid not in self.db_store:

This comment has been minimized.

@cortesi

cortesi Aug 4, 2018

Member

This as above, the membership check here is really expensive.

if order != self.order:
self.order = order
newview = [
(self._generate_order(f), f.id) for f in self.load_view()

This comment has been minimized.

@cortesi

cortesi Aug 4, 2018

Member

This seems to reload all flows from disk. Changing the order of flows on the fly is going to be a very important operation, and needs to be very snappy. I'd rather have _generate_order generate a dict with all the order keys, so we can do this once. Then we can make changing the order an operation that just changes the sort key and re-sorts.

@madt1m madt1m force-pushed the madt1m:session-flowcap branch from 4e0de43 to 1f12185 Aug 5, 2018

@madt1m madt1m force-pushed the madt1m:session-flowcap branch from 1f12185 to a524519 Aug 5, 2018

@cortesi cortesi merged commit a945b0c into mitmproxy:master Aug 14, 2018

4 checks passed

codecov/patch 100% of diff hit (target 88.83%)
Details
codecov/project 88.98% (+0.14%) compared to e74e97b
Details
continuous-integration/appveyor/pr AppVeyor build succeeded
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment