# Main Aggregation Script - calls lab.py and crystals.py

In [1]:
#Reloads the lab.py and crystals.py modules to update any changes (after saving)
#If a new method or object is created, autoreload doesn't work and the 
#kernel needs to be closed and halted after saving and making a 'checkpoint'
#in this notebook

%load_ext autoreload
%autoreload 2

In [2]:
import ipas 
import numpy as np
import dask
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, progress
from dask import delayed
from dask import dataframe as dd
import functools
import sys
import ast
from struct import *
import pickle
import glob
import random
import pandas as pd
import time
from dask.distributed import as_completed
from joblib import Parallel, delayed, parallel_backend
import matplotlib.pyplot as plt

In [3]:
cluster = SLURMCluster(
    queue='kratos',
    walltime='04-23:00:00',
    cores=1,
    memory='20000MiB', #1 GiB = 1,024 MiB
    processes=1)

#cluster.adapt(minimum=3, maximum=20)
cluster.scale(20)

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


In [4]:
client = Client(cluster)

In [5]:
client

0,1
Client  Scheduler: tcp://169.226.65.49:37721  Dashboard: http://169.226.65.49:34217/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


# Initialize databases for queries

In [6]:
files = ['sqlite:///'+f for f in glob.glob("db_files/IPAS_*_lastmono.sqlite")]
tables = ['aggregates', 'crystals']

In [7]:
%%time

df=[]
for table in tables:
    
    #read tables in parallel on client 
    read_files = [dask.delayed(dd.read_sql_table)(table=table, uri=file, index_col='id') for file in files]
    
    compute_read = client.compute(read_files)
    print('done with compute')
    ddfs = client.gather(compute_read)
    print('done with gather')
    #concatenate all sqlite files vertically (axis=0 default) (same columns)
    gathered_reads = client.scatter(ddfs)
    ddf = client.submit(dd.concat, gathered_reads).result()
    print('done with submit')
    #append combined dask df for each table
    df.append(ddf)


done with compute
done with gather
done with submit
done with compute
done with gather
done with submit
CPU times: user 3.43 s, sys: 279 ms, total: 3.7 s
Wall time: 34.1 s


In [8]:
%time df_concat = dd.concat([df[0], df[1]], axis=1)

CPU times: user 3.77 ms, sys: 964 µs, total: 4.74 ms
Wall time: 4.95 ms


We're assuming that the indexes of each dataframes are 
 aligned. This assumption is not generally safe.
  "Concatenating dataframes with unknown divisions.\n"


In [9]:
df_concat.agg_r = np.power((np.power(df_concat.a, 2) * df_concat.c), (1./3.))

In [10]:
def query_r_5000(df):
    return df[df.agg_r < 5000]

df_concat = df_concat.map_partitions(query_r_5000)
#len(df_concat) #86% of dataset

In [11]:
df_repart = df_concat.repartition(partition_size="100MB").persist()
df_repart.npartitions

48

# MAIN

In [12]:
ch_dist='gamma'         #anything other than gamma uses the characteristic from the best distribution pdf (lowest SSE)
rand_orient = True  #randomly orient the seed crystal and new crystal: uses first random orientation
save_plots = False     #saves all histograms to path with # of aggs and minor/depth folder

#Note, there may be a shape parameter erlang distribution run time warning occasionally if warning filter
#is turned off, disregard it
#warnings.filterwarnings("ignore")


In [13]:
def query_ncrystals(df_phi, r_bins):
    avg_ncrystals = []
    for r in range(len(r_bins)-1):
        df = df_phi[(df_phi.agg_r > r_bins[r]) & (df_phi.agg_r < r_bins[r+1])]
        avg_ncrystals.append(df.ncrystals.mean().compute())
    return avg_ncrystals

def concatenate_points(agg):
    points = []
    ncrystals = agg.ncrystals.values[0]
    agg_id = agg.agg_id.values[0]
    #print('ncrystals, phi, r, agg_id')

    while ncrystals >= 2:
        query = df_repart[(df_repart.r == agg.r.values[0]) & (df_repart.phi == agg.phi.values[0]) & \
                     (df_repart.ncrystals == ncrystals) & (df_repart.agg_id == agg_id)].compute()

        #print(query.ncrystals.values[0], query.phi.values[0], query.r.values[0], query.agg_id.values[0])

        points.append(pickle.loads(query.points.values[0]))
        ncrystals -= 1
        agg_id -= 1

    cluster_points = np.concatenate(points)
    cluster_points = np.reshape(cluster_points, (int(np.shape(cluster_points)[0]/12), 12))
    cluster_points = np.array(cluster_points, dtype=[('x', float), ('y', float), ('z', float)])        
        
    return cluster_points

In [14]:
def concatenate_points_all(agg):

    ncrystals = agg.ncrystals    
    #print('ncrystals', ncrystals)
    agg_id = agg.agg_id
    #print('ncrystals, phi, r, agg_id', ncrystals, agg_id, agg_id-ncrystals)
    
    query = df_repart[(df_repart.r == agg.r) & (df_repart.phi == agg.phi) & \
                     (df_repart.ncrystals >= 2) & (df_repart.ncrystals <= ncrystals) &\
                     (df_repart.agg_id <= agg_id) & (df_repart.agg_id >= agg_id-ncrystals)].compute()
    
    cluster = ipas.Cluster_Calculations(agg)
    hold_points = []
    for crys in query.itertuples():
        for points in pickle.loads(crys.points):
            hold_points.append(points)
        #print('hold points', hold_points)

    #cluster.points = np.concatenate(hold_points)
    cluster.points = np.reshape(hold_points, (int(np.shape(hold_points)[0]/12), 12))
    cluster.points = np.array(cluster.points, dtype=[('x', float), ('y', float), ('z', float)])        
        
#     points = np.concatenate(hold_points)
#     points = np.reshape(points, (int(np.shape(cluster.points)[0]/12), 12))
#     points = np.array(points, dtype=[('x', float), ('y', float), ('z', float)])    
  
    return cluster

In [21]:
def main():
    
    output = []
    hold_clusters  = np.empty((20,20,301), dtype=object)
    res, phi_bins = pd.qcut(df_repart.agg_phi.compute(), 20, retbins=True)
    print(phi_bins)
    
    for i in range(len(phi_bins)-1):
        
        #print('phi_bin = ', phi_bins[i], phi_bins[i+1])
        #return a df that only queries within an aspect ratio bin
        df_phi = df_repart[(df_repart.agg_phi > phi_bins[i]) & (df_repart.agg_phi < phi_bins[i+1]) & \
                          (df_repart.ncrystals > 2)]  #to ensure at least 2 crystals within agg since ncrystals=1 not in db
        #now break that aspect ratio bin into 20 equal r bins
        res, r_bins = pd.qcut(df_phi.agg_r.compute(), 20, retbins=True)
        print(r_bins)

        for r in range(len(r_bins)-1):
            print('i, r ',i, r)

            print('r = ', r_bins[r], r_bins[r+1])
            df_r = df_phi[(df_phi.agg_r > r_bins[r]) & (df_phi.agg_r < r_bins[r+1]) &\
                            (df_phi.ncrystals > 2)].compute() 

            #print(df_repart.id.value_counts().compute().head(30))         

            start_time = time.time()
            
            samples = df_r.sample(301)
            
            start_time = time.time()
            hold_clus = []
            count=0
            for agg in samples.itertuples():
                cluster = concatenate_points_all(agg)
                #print(cluster.points)
                hold_clus.append(cluster)
                hold_clusters[i,r,count] = cluster
                count+=1
            print('time to concatenate all pts = ', (time.time()-start_time))

#             delayeds = []
#             for agg in samples.itertuples():
#                 delayeds.append(dask.delayed(concatenate_points_all)(agg))
#             delayeds = client.compute(delayeds)
#             hold_clusters[i,r,:] = client.gather(delayeds)
#             print('time to concatenate all pts = ', (time.time()-start_time))
            
            output.append(dask.delayed(ipas.collect_clusters)(hold_clus, rand_orient=rand_orient)) 
    
    start_time = time.time()
    output = client.compute(output)
    output = client.gather(output)
    print('time to collect = ', (time.time()-start_time))
    print('done gathering!')
            #%time output.append(ipas.collect_clusters(hold_clusters[i,r,:], rand_orient=rand_orient))
    
    return output, hold_clusters
    

In [22]:
output, hold_clusters = main() 

[7.62154416e-03 2.14812692e-01 2.72897208e-01 3.27197862e-01
 3.81496917e-01 4.35821645e-01 4.88461816e-01 5.41019280e-01
 5.98565918e-01 6.76668744e-01 1.72073512e+00 2.26164630e+00
 2.62465446e+00 2.93723248e+00 3.24091716e+00 3.54873330e+00
 3.89497104e+00 4.29933583e+00 4.81586711e+00 5.59733508e+00
 2.79829585e+01]
[1.93674364e+00 1.05012107e+01 1.86996848e+01 2.66389711e+01
 3.48745366e+01 4.39941508e+01 5.43105348e+01 6.71199507e+01
 9.19922160e+01 1.75719966e+02 2.58287903e+02 3.39623198e+02
 4.22946654e+02 5.13232928e+02 6.13709726e+02 7.40036407e+02
 9.50377065e+02 1.87726979e+03 3.21114847e+03 4.10955621e+03
 4.99991238e+03]
i, r  0 0
r =  1.9367436430512242 10.50121066456794
time to concatenate all pts =  39.520732402801514
i, r  0 1
r =  10.50121066456794 18.699684793928302
time to concatenate all pts =  40.23701572418213
i, r  0 2
r =  18.699684793928302 26.63897108330048
time to concatenate all pts =  39.606839656829834
i, r  0 3
r =  26.63897108330048 34.87453656091749


KilledWorker: ('collect_clusters-5be4283f-fecb-44e6-833a-7bbe72f73e76', <Worker 'tcp://169.226.65.52:46397', name: 13, memory: 0, processing: 115>)

In [23]:
client.close()

In [None]:
with parallel_backend("loky"):
    output = Parallel(n_jobs=-1)(delayed(ipas.collect_clusters)(cluster, rand_orient=rand_orient) for cluster in clusters)
    print(output)   

distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': "('repartition-100000000-b9b6af723c5235062ffd2a9b234c9fb0', 28)"}
distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': "('repartition-100000000-b9b6af723c5235062ffd2a9b234c9fb0', 28)"}
distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': "('repartition-100000000-b9b6af723c5235062ffd2a9b234c9fb0', 11)"}
distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': "('repartition-100000000-b9b6af723c5235062ffd2a9b234c9fb0', 11)"}
distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': "('repartition-100000000-b9b6af723c5235062ffd2a9b234c9fb0', 36)"}
distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': "('repartition-100000000-b9b6af723c5235062ffd2a9b234c9fb0', 36)"}
distributed.scheduler - CRITICAL - Tried writing to 

In [None]:
filename = 'instance_files/instance_db_aggagg_rand_returnclus'
filehandler = open(filename, 'wb')
pickle.dump(output, filehandler)
filehandler.close()


In [None]:
filename = 'instance_files/pulled_clusters_rand'
filehandler = open(filename, 'wb')
pickle.dump(hold_clusters, filehandler)
filehandler.close()
print('finished!')

In [68]:
file = open('instance_files/pulled_clusters_flat', 'rb')
b2 = pickle.load(file)

In [71]:
[i.ncrystals for i in b2]

[43, 3, 38]

In [40]:
#print(output)
#print('computing...')
start_time = time.time()
#b1 = client.compute(output, scheduler='distributed') 

clusters = iter(output)
futures = [client.submit(ipas.collect_clusters, list(next(clusters)), rand_orient=rand_orient) for i in range(4)]
ac = as_completed(futures)

b1=[]
for finished_future in ac:
    # submit new future 
    try:
        print('submiting a new future')
        new_future = client.submit(ipas.collect_clusters, list(next(clusters)), rand_orient=rand_orient)
        ac.add(new_future)
    except StopIteration:
        pass
    b1.append(finished_future.result())


#print('-------------gathering-----------')
#b1 = client.gather(b1)
print('time to collect = ', (time.time()-start_time))



KeyboardInterrupt: 

In [None]:
print(b1)

In [34]:
if __name__ == '__main__':

    %time b1 = main() 

    filename = 'instance_files/instance_db_aggagg_flat'
    filehandler = open(filename, 'wb')
    %time pickle.dump(b1, filehandler)
    filehandler.close()
    print('finished!')

i, r  0 0
r =  1.7487839568275625 13.844468606238443
time to concatenate all pts =  1.5152053833007812
i, r  0 1
r =  13.844468606238443 23.236377210240864


KeyboardInterrupt: 

NameError: name 'b1' is not defined

finished!
