## Make a simple synthetic dataset to test hierarchical merge in FOF algorithm

#### the idea is this: 

* after the local FOF stage, each partition reports the particles it holds in the overlap region
* do a reduceByKey or treeAggregate of some sort to collect the groups belonging to the same particles
* produce a mapping of $G -> G_1$ and distribute to all hosts in form of broadcast lookup table

In [1]:
import numpy as np
import matplotlib.pylab as plt
%matplotlib inline

import sys
sys.setrecursionlimit(sys.getrecursionlimit()*10)
import matplotlib.patches as patches

plt.style.use('bmh')

In [2]:
import findspark
findspark.init()

In [3]:
def plot_rectangle(rec, ax=None):
    if ax is None: 
        ax = plt.subplot(aspect='equal')
    
    if isinstance(rec, (list, tuple)):
        for r in rec: 
            plot_rectangle(r,ax)
    
    else:
        size = (rec.maxes-rec.mins)
        ax.add_patch(patches.Rectangle(rec.mins, size[0], size[1], fill=False, zorder=-1))
    plt.draw()
    plt.show()

## Start Spark

In [4]:
import findspark
findspark.init()

In [5]:
import os
os.environ['SPARK_CONF_DIR'] = './conf'
os.environ['SPARK_DRIVER_MEMORY'] = '8G'

In [6]:
import pyspark
from pyspark import SparkContext, SparkConf
import pynbody

In [7]:
conf = SparkConf()

In [8]:
conf.set('spark.python.profile', 'true')
conf.set('spark.executor.memory', '8G')
conf.set('spark.driver.memory', '8G')
conf.set('spark.driver.maxResultSize', '5G')

<pyspark.conf.SparkConf at 0x2abe90363f50>

In [9]:
import subprocess, re

In [12]:
from IPython.display import HTML

out = subprocess.check_output(["bjobs","-o","job_name jobid"])
jobid = re.findall('spark-fof (\d+)', out)[0]

job_peek = subprocess.check_output(["bpeek", str(jobid)])

master_url, master_webui = re.findall('(spark://\S+:\d{4}|http://\S+:\d{4})', job_peek)

HTML("""<p>master running at %s</p>
     <p>Web UI available <a target='_blank' href='%s\'>here</a>
     """%(master_url,master_webui))

In [13]:
sc = SparkContext(master=master_url, conf=conf)

In [14]:
import spark_fof

### Make the base RDD

In [15]:
def convert_to_fof_particle(s): 
    p_arr = np.frombuffer(s, pdt_tipsy)
    
    new_arr = np.zeros(len(p_arr), dtype=pdt)
    new_arr['pos'] = p_arr['pos']
    
    return new_arr

In [16]:
def get_minmax(index, i): 
    mins = []
    maxs = []
    for arr in i: 
        mins.append(arr['pos'].min(axis=0))
        maxs.append(arr['pos'].max(axis=0))
    
    yield (index, (np.concatenate(mins).reshape(len(mins),3).min(axis=0), np.concatenate(maxs).reshape(len(mins),3).max(axis=0)))

In [17]:
from pyspark.accumulators import AccumulatorParam

class dictAdd(AccumulatorParam):
    def zero(self, value):
        return {i:0 for i in range(len(value))}
    def addInPlace(self, val1, val2): 
        for k, v in val2.iteritems(): 
            val1[k] += v
        return val1

In [18]:
from spark_fof.spark_fof_c import pdt

In [19]:
# function to map from file block numbers to domain bin
N = 62
map_file_to_domain = lambda (x,y,z): (x-1) + (y-1)*N + (z-1)*N*N

pdt_lc = np.dtype([('pos', 'f4', 3),('vel', 'f4', 3)])

import re
import time

get_block_ids = re.compile('blk\.(\d+)\.(\d+)\.(\d+)i')

def make_lc_rdd(sc, path):
    from glob import glob

    def set_particle_IDs_partition(index, iterator): 
        p_counts = partition_counts.value
        local_index = 0
        start_index = sum([p_counts[i] for i in range(index)])
        for arr in iterator:
            arr['iOrder'] = range(start_index + local_index, start_index + local_index + len(arr))
            local_index += len(arr)
            yield arr
    
    def read_file(index, i, chunksize=102400): 
        for filename in i:
            timein = time.time()
            with open(filename,'rb') as f: 
                header = f.read(62500)
                while True:
                    chunk = f.read(chunksize*24)
                    if len(chunk): 
                        p_arr = np.frombuffer(chunk, pdt_lc)
                        new_arr = np.zeros(len(p_arr), dtype=pdt)
                        new_arr['pos'] = p_arr['pos']
                        npart_acc.add({index: len(p_arr)})
                        yield new_arr
                    else: 
                        print 'reading %s took %d seconds in partition %d'%(filename, time.time()-timein, index)
                        break
                    
    files = glob('/cluster/home/roskarr/scratch/euclid/2Tlc-final/*/*')

    ids = map(lambda x: tuple(map(int, get_block_ids.findall(x)[0])), files)
    ids_map = {x:i for i,x in enumerate(ids)}
    ids_map_b = sc.broadcast(ids_map)
    nfiles = len(files)

    print 'Number of input files: ', nfiles
    
    # set the partition count accumulator
    npart_acc = sc.accumulator({i:0 for i in range(nfiles)}, dictAdd())
    
    rec_rdd = (sc.parallelize(zip(ids,files), numSlices=27).partitionBy(nfiles, lambda x: ids_map_b.value[x])
                 .values()
                 .mapPartitionsWithIndex(read_file))
    
    rec_rdd.count()
    partition_counts = sc.broadcast(npart_acc.value)
    
    return partition_counts, ids_map, rec_rdd.mapPartitionsWithIndex(set_particle_IDs_partition)

In [20]:
%%time 
p_counts, ids_map, p_rdd = make_lc_rdd(sc, '2Tlc-final/')
#p_rdd.cache()
#p_rdd.count()

Number of input files:  27
CPU times: user 30 ms, sys: 10 ms, total: 40 ms
Wall time: 1min 55s


In [21]:
%time limits = p_rdd.mapPartitionsWithIndex(get_minmax).collect()

CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 29.1 s


In [22]:
diff = (limits[0][1][1] - limits[0][1][0])[0]
global_min = -31*diff
global_max = 31*diff

diff, global_min, global_max

(0.033068776, -1.0251320600509644, 1.0251320600509644)

In [23]:
dom_maxs = np.array([global_max]*3, dtype=np.float64)
dom_mins = np.array([global_min]*3, dtype=np.float64)

In [24]:
reload(spark_fof.spark_fof)

<module 'spark_fof.spark_fof' from '/cluster/project/sis/ri/roskarr/spark-fof/spark_fof/spark_fof.pyc'>

In [25]:
#tau = diff*5./125.
tau = 0.2/12600
domain_containers = spark_fof.spark_fof.setup_domain(62, tau, dom_mins, dom_maxs)

In [26]:
global_to_cutout_map = {}
for k,v in ids_map.iteritems(): 
    global_to_cutout_map[map_file_to_domain(k)] = v

In [27]:
limits[0][1][0]

array([-0.06613756, -0.06613756, -0.06613756], dtype=float32)

### Run FOF and Merge groups

In [28]:
n_containers = len(domain_containers)

In [29]:
mins = np.zeros((n_containers, 3))
maxs = np.zeros((n_containers, 3))
mins_buff = np.zeros((n_containers, 3))
maxs_buff = np.zeros((n_containers, 3))

In [30]:
for i in range(n_containers):
    r = domain_containers[i]
    mins[i] = r.mins
    maxs[i] = r.maxes
    mins_buff[i] = r.bufferRectangle.mins
    maxs_buff[i] = r.bufferRectangle.maxes

In [31]:
fof_analyzer = spark_fof.spark_fof.FOFAnalyzer(sc, p_rdd, 64, 62, tau, dom_mins, dom_maxs, Npartitions=27)

wtf


In [32]:
### THIS MUST BE FIXED IN THE MAIN CODE
fof_analyzer.global_to_local_map = global_to_cutout_map

In [34]:
fof_analyzer.merged_rdd.cache()

PythonRDD[17] at RDD at PythonRDD.scala:48

In [35]:
group_merge_map = fof_analyzer.group_merge_map
gr_map_inv = {v:k for (k,v) in group_merge_map.iteritems()}
group_merge_map_b = sc.broadcast(group_merge_map)
gr_map_inv_b = sc.broadcast(gr_map_inv)

In [51]:
from itertools import izip
def count_groups_partition(particle_arrays, gr_map_inv_b, nMinMembers): 
    p_arr = np.concatenate(list(particle_arrays))
    gs, counts = np.unique(p_arr['iGroup'], return_counts=True)
    gr_map_inv = gr_map_inv_b.value
    return ((g,cnt) for g,cnt in izip(gs,counts) if (g in gr_map_inv) or (cnt >= nMinMembers))

In [37]:
len(group_merge_map)

880530

In [38]:
nMinMembers = fof_analyzer.nMinMembers

In [62]:
group_counts = fof_analyzer.merged_rdd.mapPartitions(lambda p_arrs: count_groups_partition(p_arrs, gr_map_inv_b, nMinMembers)).cache()

In [64]:
merge_group_counts = group_counts.filter(lambda (g,cnt): g in gr_map_inv_b.value).reduceByKey(lambda a,b: a+b).filter(lambda (g,cnt): cnt>=nMinMembers).cache()

In [65]:
merge_group_counts.count()

9450

In [66]:
merge_group_counts.take(10)

[(21490434162, 100),
 (60155101899, 169),
 (94511694672, 65),
 (64449021438, 135),
 (85916780082, 88),
 (30089807667, 240),
 (17207920584, 156),
 (85924514367, 373),
 (55860923052, 145),
 (73029520134, 164)]

In [67]:
total_group_counts = group_counts + merge_group_counts

In [71]:
total_group_counts.sortBy(lambda (k,v): -v).take(100)

[(77311066599, 865855),
 (85910801447, 865609),
 (4322239800, 698213),
 (47270917788, 653715),
 (4322239800, 595231),
 (10095789, 571398),
 (90223847363, 497635),
 (60133312412, 497502),
 (21485504954, 496181),
 (25781078975, 456429),
 (90223847363, 452018),
 (47270917788, 441531),
 (25797478932, 423419),
 (12884930629, 419587),
 (85908348645, 410652),
 (42975334250, 403781),
 (42975334250, 403377),
 (21479254542, 401019),
 (47250438119, 397850),
 (4324372234, 393099),
 (51560716916, 391677),
 (90221638460, 382890),
 (98801218074, 378490),
 (60142548292, 373651),
 (81617926698, 367391),
 (21481847521, 364710),
 (21479311567, 363305),
 (90222120073, 361688),
 (60149860007, 357280),
 (42965245322, 355301),
 (60149860007, 354047),
 (60146653219, 346119),
 (17207378830, 345465),
 (94515230820, 340983),
 (51555419047, 334509),
 (90221638460, 329533),
 (47244965812, 317144),
 (77330184973, 314216),
 (77330184973, 314203),
 (47247315769, 304443),
 (51555848133, 302304),
 (17194896711, 301114)

## Used for testing

In [None]:
def read_tipsy_output(filename, chunksize = 2048): 
    """
    Read a tipsy file and set the sequential particle IDs
    
    This scans through the data twice -- first to get partition particle counts
    and a second time to actually set the particle IDs.
    """
    
    # helper functions
    def convert_to_fof_particle_partition(index, iterator): 
        for s in iterator: 
            a = convert_to_fof_particle(s)
            if count: 
                npart_acc.add({index: len(a)})
            yield a

    def set_particle_IDs_partition(index, iterator): 
        p_counts = partition_counts.value
        local_index = 0
        start_index = sum([p_counts[i] for i in range(index)])
        for arr in iterator:
            arr['iOrder'] = range(start_index + local_index, start_index + local_index + len(arr))
            local_index += len(arr)
            yield arr
    
    rec_rdd = sc.binaryRecords(filename, pdt_tipsy.itemsize*chunksize)
    nPartitions = rec_rdd.getNumPartitions()
    # set the partition count accumulator
    npart_acc = sc.accumulator({i:0 for i in range(nPartitions)}, dictAdd())
    count=True
    # read the data and count the particles per partition
    rec_rdd = rec_rdd.mapPartitionsWithIndex(convert_to_fof_particle_partition)
    rec_rdd.count()
    count=False

    partition_counts = sc.broadcast(npart_acc.value)

    return rec_rdd.mapPartitionsWithIndex(set_particle_IDs_partition)

In [None]:
#p_rdd = read_tipsy_output('/Users/rok/polybox/euclid256.nat_no_header')

In [None]:
#p_rdd.cache().count()

In [None]:
# %%time 
# nMinMembers = 1
# n_groups = fof.run(ps, tau, nMinMembers)
# print 'number of groups to %d particle = %d'%(nMinMembers, n_groups)