diff --git a/TODO b/TODO new file mode 100644 index 0000000..74de024 --- /dev/null +++ b/TODO @@ -0,0 +1,36 @@ +TODO +=============== + +* Web UI + - check key/values in any of the keyspaces + - simple browsing UI + = object API + +* Implementation Fixes + = fetches hangup gracefully + = partial packfile cache usage + - re-packcache removes old pack caches (at least the index entries) + +* Multi-node Test Environment + - multi-node project loading/updating + - benchmark clones + - x.github.com + - quick node setup + - install java, python + - copy cassandra tarball, conf files, start running it (god?) + - list of small, public projects to seed + +* Updating System + - seeding / updating population + - new github api to get all public repos, last update time / since update time + - get new/updated repos (from feed) + - clone/fetch on claiming node + - run update script + - re-packcache occasionally + - keep clone/fetch stats (total time, how many packed/loose objects) + +* Large project support + - split large blobs in object space on insert + - if the client split up blobs, be able to reconstruct and pack them + - split long revtree entries + - reconstruct long revtree entries (possibly done) diff --git a/dulwich/agitmemnon.py b/dulwich/agitmemnon.py index 33e3413..92d37a8 100644 --- a/dulwich/agitmemnon.py +++ b/dulwich/agitmemnon.py @@ -35,7 +35,8 @@ def __init__(self): port = 9160 self.keyspace = 'Agitmemnon' self.memcache = {} - + self.revtree = {} + socket = TSocket.TSocket(host, port) transport = TTransport.TBufferedTransport(socket) protocol = TBinaryProtocol.TBinaryProtocol(transport) @@ -78,8 +79,6 @@ def __contains__(self, sha): return False def __getitem__(self, name): - if name in self.memcache: - return self.memcache[name] o = self.get_object(name) data = '' otype = '' @@ -90,11 +89,35 @@ def __getitem__(self, name): if col.name == 'type': otype = col.value data = zlib.decompress(base64.b64decode(data)) - shafile = ShaFile.from_raw_string(type_num_map[otype], data) - if otype != BLOB_ID: # caching commit/tree/tag objects since they are hit twice - self.memcache[name] = shafile - return shafile + return ShaFile.from_raw_string(type_num_map[otype], data) + def get_revtree_objects(self, sha): + # check for entry in revtree cache + # if it's not there, pull another chunk, check there, loop + # return all the objects included in that commit + if sha in self.revtree: + return self.revtree[sha] + else: + self.load_next_revtree_hunk() + if sha in self.revtree: + return self.revtree[sha] + else: + return False + + def load_next_revtree_hunk(self): + if len(self.revtree) > 0: # hack + return False + o = self.get_super('RevTree', self.repo_name, 100000) + nilsha = '0000000000000000000000000000000000000000' + for col in o: + self.revtree[col.name] = [] + for sup in col.columns: + objects = sup.value.split(":") + if nilsha in objects: + objects.remove(nilsha) + if '' in objects: + objects.remove('') + self.revtree[col.name].extend(objects) def find_common_revisions(self, graphwalker): """Find which revisions this store has in common using graphwalker.""" @@ -108,7 +131,7 @@ def find_common_revisions(self, graphwalker): return haves def find_missing_objects(self, haves, wants, progress=None): - return iter(MissingObjectFinder(self, haves, wants, progress).next, None) + return iter(AgitMissingObjectFinder(self, haves, wants, progress).next, None) def iter_shas(self, shas): """Iterate over the objects for the specified shas.""" @@ -119,6 +142,37 @@ def fetch_objects(self, determine_wants, graph_walker, progress): haves = self.find_common_revisions(graph_walker) return self.iter_shas(self.find_missing_objects(haves, wants, progress)) + def partial_sender(self, objects, f, entries): + # PackCacheIndex (projectname) [(cache_key) => (list of objects/offset/size), ...] + + sent = set() + objs = set() + for sha, path in objects.itershas(): + objs.add(sha) + + index = a.get('PackCacheIndex', self.repo_name) + + # parse cache_index entries, figure out what we need to pull + # (which caches have enough objects that we need) + # "sha:offset:size:base_sha\n" + for cache in index: + # cache.name + cacheobjs = set() + entries = cache.value.split("\n") + if '' in entries: + entries.remove('') + for entry in entries: + (sha, offset, size, ref) = entry.split(":") + cacheobjs.add(sha) + if len(cacheobjs - objs) == 0: + # pull each partial cache and send all the objects that are needed + data = self.get_value('PackCache', cache.name, 'data') + data = base64.b64decode(data) + f.write(data) + sent = sent | cacheobjs # add each sent object to the sent[] array to return + + return sent # return the sent[] array + def get_refs(self): """Get dictionary with all refs.""" print self.repo_name @@ -128,8 +182,12 @@ def get_refs(self): x = x.super_column for col in x.columns: if len(col.value) == 40: - ret['refs/' + x.name + '/' + col.name] = col.value + if x.name != 'meta': + ret['refs/' + x.name + '/' + col.name] = col.value if x.name == 'heads' and col.name == 'master': + if 'HEAD' not in ret: + ret['HEAD'] = col.value + if x.name == 'meta' and col.name == 'HEAD': ret['HEAD'] = col.value return ret @@ -141,6 +199,39 @@ def set_args(self, args): rname = rname.replace('.git','') self.repo_name = rname +class AgitMissingObjectFinder(object): + """Find the objects missing from another object store. + + :param object_store: Object store containing at least all objects to be + sent + :param haves: SHA1s of commits not to send (already present in target) + :param wants: SHA1s of commits to send + :param progress: Optional function to report progress to. + """ + + def __init__(self, object_store, haves, wants, progress=None): + self.sha_done = set(haves) + self.objects_to_send = set([w for w in wants if w not in haves]) + self.object_store = object_store + if progress is None: + self.progress = lambda x: None + else: + self.progress = progress + + def add_todo(self, entries): + self.objects_to_send.update([e for e in entries if not e in self.sha_done]) + + def next(self): + if not self.objects_to_send: + return None + sha = self.objects_to_send.pop() + obs = self.object_store.get_revtree_objects(sha) + if obs: + self.add_todo(obs) + self.sha_done.add(sha) + self.progress("counting objects: %d\r" % len(self.sha_done)) + return (sha, sha) # sorry, hack + class AgitmemnonBackend(Backend): def __init__(self): @@ -148,9 +239,26 @@ def __init__(self): self.fetch_objects = self.repo.fetch_objects self.get_refs = self.repo.get_refs self.set_args = self.repo.set_args + self.partial_sender = self.repo.partial_sender + + +a = Agitmemnon() +#a.repo_name = 'fuzed2' +#a.load_next_revtree_hunk() +#print a.revtree +#index = a.get('PackCacheIndex', 'fuzed2') +#myset = set() +#for cache in index: +# print cache.name +# entries = cache.value.split("\n") +# if '' in entries: +# entries.remove('') +# for entry in entries: +# (sha, offset, size, ref) = entry.split(":") +# myset.add(sha) +# print myset -#a = Agitmemnon() #print a.get_object('7486f4075d2b9307d02e3905c69e28e456a51a32')[0].value #print a['7486f4075d2b9307d02e3905c69e28e456a51a32'].get_parents() #print a.get_object('7486f4075d2b9307d02e3905c69e28e456a51a32') diff --git a/dulwich/object_store.py b/dulwich/object_store.py index a551145..e21a388 100644 --- a/dulwich/object_store.py +++ b/dulwich/object_store.py @@ -421,6 +421,9 @@ def __init__(self, store, sha_iter): self.sha_iter = sha_iter self._shas = [] + def remove_objects(self, objs): + print "FU" + def __iter__(self): """Yield tuple with next object and path.""" for sha, path in self.itershas(): diff --git a/dulwich/pack.py b/dulwich/pack.py index e858fc0..fb9e947 100644 --- a/dulwich/pack.py +++ b/dulwich/pack.py @@ -818,7 +818,7 @@ def write_pack(filename, objects, num_objects): write_pack_index_v2(filename + ".idx", entries, data_sum) -def write_pack_data(f, objects, num_objects, window=10): +def write_pack_data(f, objects, num_objects, window=10, progress=None, backend=None): """Write a new pack file. :param filename: The filename of the new pack file. @@ -826,24 +826,8 @@ def write_pack_data(f, objects, num_objects, window=10): :return: List with (name, offset, crc32 checksum) entries, pack checksum """ - # this gets a list of all the objects - actual backend walker calls here - recency = list(objects) - - # FIXME: Somehow limit delta depth - # FIXME: Make thin-pack optional (its not used when cloning a pack) - - # Build a list of objects ordered by the magic Linus heuristic - # This helps us find good objects to diff against us - magic = [] - for obj, path in recency: - magic.append( (obj.type, path, 1, -len(obj.as_raw_string()), obj) ) - magic.sort() - - # Build a map of objects and their index in magic - so we can find preceeding objects - # to diff against - offs = {} - for i in range(len(magic)): - offs[magic[i][4]] = i + if progress is None: + progress = lambda x: None # Write the pack entries = [] @@ -851,23 +835,19 @@ def write_pack_data(f, objects, num_objects, window=10): f.write("PACK") # Pack header f.write(struct.pack(">L", 2)) # Pack version f.write(struct.pack(">L", num_objects)) # Number of objects in pack - for o, path in recency: - sha1 = o.sha().digest() - orig_t = o.type - raw = o.as_raw_string() - winner = raw - t = orig_t - #for i in range(offs[o]-window, window): - # if i < 0 or i >= len(offs): continue - # b = magic[i][4] - # if b.type != orig_t: continue - # base = b.as_raw_string() - # delta = create_delta(base, raw) - # if len(delta) < len(winner): - # winner = delta - # t = 6 if magic[i][2] == 1 else 7 - offset, crc32 = write_pack_object(f, t, winner) - entries.append((sha1, offset, crc32)) + + sent = set() + if backend and (num_objects > 500): + sent = backend.partial_sender(objects, f, entries) + + shas = set() + for sha, path in objects.itershas(): + shas.add(sha) + + for sha in (shas - sent): + o = backend.repo[sha] + offset, crc32 = write_pack_object(f, o.type, o.as_raw_string()) + entries.append((o.sha().digest(), offset, crc32)) return entries, f.write_sha() diff --git a/dulwich/server.py b/dulwich/server.py index c25e1bc..13ae7b2 100644 --- a/dulwich/server.py +++ b/dulwich/server.py @@ -172,7 +172,7 @@ def next(self): progress("counting objects: %d, done.\n" % len(objects_iter)) progress("GitHub is collecting your data\n") write_pack_data(ProtocolFile(None, write), objects_iter, - len(objects_iter)) + len(objects_iter), 10, progress, self.backend) progress("Have a nice day!\n") # we are done self.proto.write("0000")