-
Notifications
You must be signed in to change notification settings - Fork 370
Feat: Introducing the GarbageCollector of local snapshots #995
Changes from 8 commits
d010e12
6a23d9d
4070be0
b0d2241
31dc09a
dc8aa3e
71e8683
7585d46
2fbdf9f
9df0a7f
defd3aa
3aa654f
327797a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,333 @@ | ||
package com.iota.iri.service.garbagecollector; | ||
|
||
import com.iota.iri.controllers.TipsViewModel; | ||
import com.iota.iri.service.snapshot.SnapshotManager; | ||
import com.iota.iri.storage.Tangle; | ||
import com.iota.iri.utils.thread.ThreadIdentifier; | ||
import com.iota.iri.utils.thread.ThreadUtils; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.*; | ||
import java.nio.file.Files; | ||
import java.nio.file.Paths; | ||
import java.util.*; | ||
|
||
/** | ||
* This class represents the manager for the cleanup jobs that are issued by the {@link SnapshotManager} in connection | ||
* with local snapshots and eventually other parts of the code. | ||
* | ||
* It plans, manages and executes the cleanup jobs asynchronously in a separate thread so cleaning up does not affect | ||
* the performance of the other tasks of the node. | ||
*/ | ||
public class GarbageCollector { | ||
/** | ||
* The interval in milliseconds that the garbage collector will check if new cleanup tasks are available. | ||
*/ | ||
private static final int GARBAGE_COLLECTOR_RESCAN_INTERVAL = 10000; | ||
|
||
/** | ||
* Logger for this class allowing us to dump debug and status messages. | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this comment is extraneous. Not a big deal though if you leave it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I created a live template for this so i don't have to "reinvent" this every time i comment a logger :P While I agree that some comments are self explanatory i still think its nice to have comments on all methods and properties (just because it feels nice to have a class completely documented). Especially compared to the state IRI is in right now where nearly nothing is documented. Also see my comment regarding the "Tangle" property. |
||
private static final Logger log = LoggerFactory.getLogger(GarbageCollector.class); | ||
|
||
/** | ||
* Holds a reference to the {@link ThreadIdentifier} for the cleanup thread. | ||
* | ||
* Using a {@link ThreadIdentifier} for spawning the thread allows the {@link ThreadUtils} to spawn exactly one | ||
* thread for this instance even when we call the {@link #start()} method multiple times. | ||
*/ | ||
private final ThreadIdentifier cleanupThreadIdentifier = new ThreadIdentifier("Snapshot Garbage Collector"); | ||
|
||
/** | ||
* Holds a reference to the tangle instance which acts as an interface to the used database. | ||
*/ | ||
final Tangle tangle; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I usually prefer to not add a comment for fields that represent common services and just rely on the class javadocs (bad example here on my side since The reason is that such references appear a lot throughout the code and you will just have to repeat the same comment everywhere and then if necessary change in lots of places. What I do is to try to document everything that is public. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sometimes 3rd party developers are interested in just certain parts of the code and don't want to digest the whole project to understand how things are working. Having been a 3rd party developer myself not too long ago i would have really appreciated comments like that, that prevent me from having to analyze the Tangle class to understand what this thing is for (especially since "Tangle" it not really giving away what it is doing). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a 3rd party developer didn't you usually use the docs hotkey on the IDE to display the docs without switching to the class? I agree that if you look over the code on github or a simple editor it helps. I personally want to add documentation to IRI classes I will refactor, but I will only add them to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. btw, |
||
|
||
/** | ||
* Holds a reference to the {@link SnapshotManager} that this garbage collector belongs to. | ||
*/ | ||
final SnapshotManager snapshotManager; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. private |
||
|
||
/** | ||
* Holds a reference to the {@link TipsViewModel} which is necessary for removing tips that were pruned. | ||
*/ | ||
final TipsViewModel tipsViewModel; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. private |
||
|
||
/** | ||
* A map of {@link JobParser}s allowing us to determine how to parse the jobs from the garbage collector state file, | ||
* based on their type. | ||
*/ | ||
private final HashMap<String, JobParser> jobParsers = new HashMap<>(); | ||
|
||
/** | ||
* A map of {@link QueueProcessor}s allowing us to process queues based on the type of the job. | ||
*/ | ||
private final HashMap<Class<? extends GarbageCollectorJob>, QueueProcessor> queueProcessors = new HashMap<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For example, this is a doc I like. |
||
|
||
/** | ||
* A map of {@link QueueConsolidator}s allowing us to consolidate the queues to consume less "space". | ||
*/ | ||
private final HashMap<Class<? extends GarbageCollectorJob>, QueueConsolidator> queueConsolidators = new HashMap<>(); | ||
|
||
/** | ||
* List of cleanup jobs that shall get processed by the garbage collector (grouped by their class). | ||
*/ | ||
private final HashMap<Class<? extends GarbageCollectorJob>, ArrayDeque<GarbageCollectorJob>> garbageCollectorJobs = new HashMap<>(); | ||
|
||
/** | ||
* The constructor of this class stores the passed in parameters for future use and restores the previous state of | ||
* the garbage collector if there is a valid one (to continue with cleaning up after IRI restarts). | ||
* | ||
* Before restoring the garbage collector state it registers the available job types, so we know how to parse and | ||
* process the found jobs. | ||
*/ | ||
public GarbageCollector(Tangle tangle, SnapshotManager snapshotManager, TipsViewModel tipsViewModel) { | ||
this.tangle = tangle; | ||
this.snapshotManager = snapshotManager; | ||
this.tipsViewModel = tipsViewModel; | ||
|
||
MilestonePrunerJob.registerInGarbageCollector(this); | ||
UnconfirmedSubtanglePrunerJob.registerInGarbageCollector(this); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I kinda got confused by this design. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because i wanted to have all job specific things encapsulated in the job, rather than make the GarbageCollector make too many assumptions about the way the job works and what kind of methods it uses to work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What confused me is that I expected the static Also it confuses me that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a method reference which is a more efficient way of writing a lambda (also much faster). "SomeClass::methodName" actually means "(myParam) -> SomeClass.methodName(myParam)" And since it gets referenced inside the Job, the job has access to that private function. What i want to achieve with this is: The GarbageCollector only needs to know which Jobs it supports - but it doesn't need to know anything about how the Job works and what requirements that job has - I still think this is a cleaner separation of logic than to hardcode all of the logic. into the GarbageCollector itself. If we want to change the way the MilestonePrunerJob works, we only need to touch this file and the GarbageCollector can stay untouched an generic. |
||
} | ||
|
||
/** | ||
* This method tries to restore the previous state of the Garbage Collector by reading the state file that get's | ||
* persisted whenever we modify any of the queues. | ||
* | ||
* It is used to restore the state of the garbage collector between IRI restarts and speed up the pruning | ||
* operations. If it fails to restore the state it just continues with an empty state which doesn't cause any | ||
* problems with future jobs other than requiring them to perform unnecessary steps and therefore slowing them down | ||
* a bit. | ||
*/ | ||
public void restoreCleanupJobs() { | ||
try { | ||
BufferedReader reader = new BufferedReader( | ||
new InputStreamReader( | ||
new BufferedInputStream( | ||
new FileInputStream(getStateFile()) | ||
) | ||
) | ||
); | ||
|
||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you have this |
||
String line; | ||
while((line = reader.readLine()) != null) { | ||
String[] parts = line.split(";", 2); | ||
if(parts.length >= 2) { | ||
JobParser jobParser = this.jobParsers.get(parts[0]); | ||
if(jobParser == null) { | ||
throw new GarbageCollectorException("could not determine a parser for cleanup job of type " + parts[0]); | ||
} | ||
|
||
addJob(jobParser.parse(parts[1])); | ||
} | ||
} | ||
} finally { | ||
reader.close(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is fine but I think I want to enforce the use of |
||
} | ||
} | ||
catch(IOException e) { /* do nothing */ } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are you ignoring this? |
||
catch(Exception e) { | ||
log.error("could not load local snapshot file", e); | ||
} | ||
} | ||
|
||
/** | ||
* This method adds a job to the GarbageCollector, that will get processed on its next run. | ||
* | ||
* It first registers the garbage collector in the job, so the job has access to all the properties that are | ||
* relevant for its execution, and that do not get passed in via its constructor. Then it retrieves the queue based | ||
* on its class and adds it there. | ||
* | ||
* After adding the job to its corresponding queue we try to consolidate the queue and persist the changes in the | ||
* garbage collector state file. | ||
* | ||
* @param job the job that shall be executed | ||
* @throws GarbageCollectorException if anything goes wrong while adding the job | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just want to say that I really like this javadoc :-) |
||
public void addJob(GarbageCollectorJob job) throws GarbageCollectorException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe change name to addJobAndConsolidate? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would imply that there would also be a method for just adding without consolidation. The consolidation is an internal mechanism of the Manager to reduce its memory and disk consumption - as the user of the Manager i don't care about how it performs its task - i just wannt to hand over the tasks and that's it. Maybe it get's a bit clearer if we seperate the interfaces from their implementation. |
||
job.registerGarbageCollector(this); | ||
|
||
ArrayDeque<GarbageCollectorJob> jobQueue = getJobQueue(job.getClass()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In You use ArrayDeque in practice but you don't care about it, and it can be easily replaced. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since i need things like addLast and so on I kind of need to be specific here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use |
||
jobQueue.addLast(job); | ||
|
||
// consolidating the queue is optional | ||
QueueConsolidator queueConsolidator = queueConsolidators.get(job.getClass()); | ||
if(queueConsolidator != null) { | ||
queueConsolidator.consolidateQueue(this, jobQueue); | ||
} | ||
|
||
persistChanges(); | ||
} | ||
|
||
/** | ||
* This method starts the cleanup {@link Thread} that asynchronously processes the queued jobs. | ||
* | ||
* This method is thread safe since we use a {@link ThreadIdentifier} to address the {@link Thread}. The | ||
* {@link ThreadUtils} take care of only launching exactly one {@link Thread} that is not terminated. | ||
*/ | ||
public void start() { | ||
ThreadUtils.spawnThread(this::cleanupThread, cleanupThreadIdentifier); | ||
} | ||
|
||
/** | ||
* Shuts down the background job by setting the corresponding shutdown flag. | ||
*/ | ||
public void shutdown() { | ||
ThreadUtils.stopThread(cleanupThreadIdentifier); | ||
} | ||
|
||
/** | ||
* This method resets the state of the GarbageCollector. | ||
* | ||
* It prunes the job queue and deletes the state file afterwards. One example of its usage is: Cleaning up the | ||
* remaining files after processing the unit tests. | ||
*/ | ||
public void reset() { | ||
garbageCollectorJobs.clear(); | ||
|
||
getStateFile().delete(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
/** | ||
* This method allows to register a parser for a given job type. | ||
* | ||
* When we serialize the pending jobs to save the current state of the garbage collector, we also dump their class | ||
* name, which allows us to generically parse their serialized representation using the registered parser function | ||
* back into the corresponding job. | ||
* | ||
* @param jobClass class of the job that the GarbageCollector shall be able to handle | ||
* @param jobParser parser function for the serialized version of jobs of the given type | ||
*/ | ||
void registerParser(Class<?> jobClass, JobParser jobParser) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
this.jobParsers.put(jobClass.getCanonicalName(), jobParser); | ||
} | ||
|
||
/** | ||
* This method allows us to register a {@link QueueProcessor} for the given job type. | ||
* | ||
* Since different kinds of jobs get processed in a different way, we are able to generically process them based on | ||
* their type after having registered a processor for them. | ||
* | ||
* @param jobClass class of the job that the GarbageCollector shall be able to handle | ||
* @param queueProcessor function that takes care of processing the queue for this particular type | ||
*/ | ||
void registerQueueProcessor(Class<? extends GarbageCollectorJob> jobClass, QueueProcessor queueProcessor) { | ||
this.queueProcessors.put(jobClass, queueProcessor); | ||
} | ||
|
||
/** | ||
* This method allows us to register a {@link QueueConsolidator} for the given job type. | ||
* | ||
* Some jobs can be consolidated to consume less space in the queue by grouping them together or skipping them | ||
* completely. While the consolidation of multiple jobs into fewer ones is optional and only required for certain | ||
* types of jobs, this method allows us to generically handle this use case by registering a handler for the job | ||
* class that supports this feature. | ||
* | ||
* @param jobClass class of the job that the GarbageCollector shall be able to handle | ||
* @param queueConsolidator lambda that takes care of consolidating the entries in a queue | ||
*/ | ||
void registerQueueConsolidator(Class<? extends GarbageCollectorJob> jobClass, QueueConsolidator queueConsolidator) { | ||
this.queueConsolidators.put(jobClass, queueConsolidator); | ||
} | ||
|
||
/** | ||
* This method persists the changes of the garbage collector so IRI can continue cleaning up upon restarts. | ||
* | ||
* Since cleaning up the old database entries can take a long time, we need to make sure that it is possible to | ||
* continue where we stopped. This method therefore writes the state of the queued jobs into a file that is read | ||
* upon re-initialization of the GarbageCollector. | ||
* | ||
* @throws GarbageCollectorException if something goes wrong while writing the state file | ||
*/ | ||
void persistChanges() throws GarbageCollectorException { | ||
try { | ||
Files.write( | ||
Paths.get(getStateFile().getAbsolutePath()), | ||
() -> garbageCollectorJobs.entrySet().stream().flatMap( | ||
entry -> entry.getValue().stream() | ||
).<CharSequence>map( | ||
entry -> entry.getClass().getCanonicalName() + ";" + entry.serialize() | ||
).iterator() | ||
); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a question (since I am not a functional type of guy), do you find this readable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is how it usually look :D There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I am pretty sure since you're doing |
||
} catch(IOException e) { | ||
throw new GarbageCollectorException("could not persists garbage collector state", e); | ||
} | ||
} | ||
|
||
/** | ||
* This method contains the logic for the processing of the cleanup jobs, that gets executed in a separate | ||
* {@link Thread}. | ||
* | ||
* It repeatedly calls {@link #processCleanupJobs()} until the GarbageCollector is shutting down. | ||
*/ | ||
private void cleanupThread() { | ||
while(!Thread.interrupted()) { | ||
try { | ||
processCleanupJobs(); | ||
} catch(GarbageCollectorException e) { | ||
log.error("error while processing the garbage collector jobs", e); | ||
} | ||
|
||
ThreadUtils.sleep(GARBAGE_COLLECTOR_RESCAN_INTERVAL); | ||
} | ||
} | ||
|
||
/** | ||
* This method contains the logic for scheduling the jobs and executing them. | ||
* | ||
* It iterates through all available queues and executes the corresponding {@link QueueProcessor} for each of them | ||
* until all queues have been processed. | ||
* | ||
* @throws GarbageCollectorException if anything goes wrong while processing the cleanup jobs | ||
*/ | ||
private void processCleanupJobs() throws GarbageCollectorException { | ||
for(Map.Entry<Class<? extends GarbageCollectorJob>, ArrayDeque<GarbageCollectorJob>> entry : garbageCollectorJobs.entrySet()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ArrayDeque = Queue There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since i need things like addLast and so on I kind of need to be specific here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do you release the jobs you already performed? |
||
if(Thread.interrupted()) { | ||
return; | ||
} | ||
|
||
QueueProcessor queueProcessor = queueProcessors.get(entry.getKey()); | ||
if(queueProcessor == null) { | ||
throw new GarbageCollectorException("could not determine a queue processor for cleanup job of type " + entry.getKey().getCanonicalName()); | ||
} | ||
|
||
queueProcessor.processQueue(this, entry.getValue()); | ||
} | ||
} | ||
|
||
/** | ||
* This method retrieves the job queue belonging to a given job type. | ||
* | ||
* It first checks if a corresponding queue exists already and creates a new one if no queue was created yet for the | ||
* given job type. | ||
* | ||
* @param jobClass type of the job that we want to retrieve the queue for | ||
* @return the list of jobs for the provided job type | ||
*/ | ||
private ArrayDeque<GarbageCollectorJob> getJobQueue(Class<? extends GarbageCollectorJob> jobClass) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It is actually good to be specific as possible when you return. However, if you can decouple implementation it is always a good thing. |
||
if (garbageCollectorJobs.get(jobClass) == null) { | ||
synchronized(this) { | ||
if (garbageCollectorJobs.get(jobClass) == null) { | ||
garbageCollectorJobs.put(jobClass, new ArrayDeque<>()); | ||
} | ||
} | ||
} | ||
|
||
return garbageCollectorJobs.get(jobClass); | ||
} | ||
|
||
/** | ||
* This method returns a file handle to the local snapshots garbage collector file. | ||
* | ||
* It constructs the path of the file by appending the corresponding file extension to the | ||
* {@link com.iota.iri.conf.BaseIotaConfig#localSnapshotsBasePath} config variable. If the path is relative, it | ||
* places the file relative to the current working directory, which is usually the location of the iri.jar. | ||
* | ||
* @return File handle to the local snapshots garbage collector file. | ||
*/ | ||
private File getStateFile() { | ||
return new File(snapshotManager.getConfiguration().getLocalSnapshotsBasePath() + ".snapshot.gc"); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package com.iota.iri.service.garbagecollector; | ||
|
||
/** | ||
* This class is used to wrap exceptions that are specific to the garbage collector logic. | ||
* | ||
* It allows us to distinct between the different kinds of errors that can happen during the execution of the code. | ||
*/ | ||
public class GarbageCollectorException extends Exception { | ||
/** | ||
* Constructor of the exception which allows us to provide a specific error message and the cause of the error. | ||
* | ||
* @param message reason why this error occurred | ||
* @param cause wrapped exception that caused this error | ||
*/ | ||
GarbageCollectorException(String message, Throwable cause) { | ||
super(message, cause); | ||
} | ||
|
||
/** | ||
* Constructor of the exception which allows us to provide a specific error message without having an underlying | ||
* cause. | ||
* | ||
* @param message reason why this error occurred | ||
*/ | ||
GarbageCollectorException(String message) { | ||
super(message); | ||
} | ||
|
||
/** | ||
* Constructor of the exception which allows us to wrap the underlying cause of the error without providing a | ||
* specific reason. | ||
* | ||
* @param cause wrapped exception that caused this error | ||
*/ | ||
GarbageCollectorException(Throwable cause) { | ||
super(cause); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we don't get confused with Java GC, can you change the name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any name suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TanglePruner?
(I am bad at names :-P)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to start using interfaces for classes that are not part of the domain model but execute business logic.
Currently this doesn't happen much in IRI but it should happen. We want to depend on abstractions not concretions (SOLID).
Just move all the public methods and javadocs to the interface please.
You can append "Impl" to the name of the implementing class.