## Simple parallel processing in IPython

First, one has to start the `engines`. Using the notebook, it is as easy as going to the notebook dashboard, clicking on the Clusters profile, and starting as many engines as you want.

Usually the best maximum number is half of the advertised cores of your Intel CPU, as half of them are only virtual cores, but this can strongly depend on what you actually do.

Now I will create a `Client()` object, and a direct and a load-balanced view to the engines. The direct view means that I have full control over what tasks get to which engine, and it is a bit easier to check out the available engines, hence a 'direct' view.
The load-balanced view is an interface where I trust the machinery to efficiently distribute the work to any engine that is available. This view should be the better way to treat truly [embarrassingly parallel](http://en.wikipedia.org/wiki/Embarrassingly_parallel) problems.

In [None]:
from ipyparallel import Client
rc = Client()

In [None]:
dview = rc[:]
lview = rc.load_balanced_view()

Let's check with the direct view that each engine has its own process id.

In [None]:
import os
dview.apply_sync(os.getpid)

The method `apply_sync` guarantees, that I wait for the results to come back, whatever the current configurated default status for the engines is. I find it much better to use method names that tell me immediately if I have to expect results or not, independent of some background flag that I otherwise have to keep track of.

Try the above call with the `apply_async` version, and you will see that you don't get any results back, because you are not waiting for them.

In [None]:
pd.read_csv('/luna4/maye/rdr_out/verification/beta_0_elliptical/2012022408_C7_RDR_2.CSV').jdate.ptp()

In [None]:
%%px --local
def check_csv(c):
    import pandas as pd
    csv1 = pd.read_csv('/luna4/maye/rdr_out/verification/beta_0_elliptical/2012061905_C'+
                       str(c)+'_RDR_2.CSV')
    return csv1.jdate.ptp()

In [None]:
results = dview.map_async(check_csv, range(3,10))

In [None]:
for res in results:
    print(res)

In [None]:
csv1 = pd.read_csv('/luna4/maye/rdr_out/verification/beta_90_elliptical/2012061905_C7_RDR_2.CSV')

In [None]:
csv1.jdate.ptp()

In [None]:
csv1.c.describe()

In [None]:
from diviner import file_utils as fu
tstr = '2012061905'
obs = fu.DivObs(tstr)
rdrs = obs.get_rdrs()

In [None]:
from diviner import production
config = production.Configurator(run_name='beta_90_elliptical')

In [None]:
config.savedir

In [None]:
obs, rdr1, tb, rad = production.get_data_for_merge(tstr, config.savedir)

In [None]:
from diviner import ana_utils as au
prod = production
channel = au.Channel(7)
rdr1_merged = prod.melt_and_merge_rdr1(rdr1, channel.div)
tb_molten_c = prod.grep_channel_and_melt(tb, 'tb', channel, obs)
rad_molten_c = prod.grep_channel_and_melt(rad, 'radiance', channel, obs)

In [None]:
rdr1_merged.columns

In [None]:
rdr1_merged.jdate.ptp()

In [None]:
mergecols = 'index det'.split()
rdr2 = rdr1_merged.merge(tb_molten_c, left_on=mergecols,
                                 right_on=mergecols)

In [None]:
rdr2.jdate.ptp()

In [None]:
prod.add_time_columns(rdr2)

In [None]:
rdr2.jdate.ptp()

In [None]:
rdr2.fillna(-9999, inplace=True)
rdr2.det = rdr2.det.astype('int')
rdr2.drop('index', inplace=True, axis=1)
rdr2['c'] = channel.div

In [None]:
rdr2.jdate.ptp()

In [None]:
clon_cols = rdr2.filter(regex="^clon_").columns
if config.swap_clons:
    for col in clon_cols:
        rdr2[col] = rdr2[col].map(lambda x: -(360 - x)
                                  if x > 180 else x)

In [None]:
rdr2.jdate.ptp()

In [None]:
cd ~

In [None]:
rdr2.to_csv('jdate_test_out.csv', index=False)
csvtest = pd.read_csv('jdate_test_out.csv')
print csvtest.jdate.ptp()
rdr2.to_csv('jdate_test_out.csv', index=False, engine='fast')
csvtest = pd.read_csv('jdate_test_out.csv')
print csvtest.jdate.ptp()