Permalink
Browse files

Convert aggregation of PDFs to Charm++ reductions

This commit finishes #72. Now in walker both the estimation of statistics as
well as the PDFs use Charm++ reductions instead of Integrators sending their
contributors via vanilla entry method calls to Distributor for final assembly.
This is a much more efficient reduction, because Charm++ reductions aggregate
individual contributions on a PE and use a tree-based strategy among PEs.
Furthermore, this means the difference between overloading the messaging system
at high core counts (as discussed in 6aa75ba) and not overloading and thus being
able to carry on with the simulation.

Converting the estimation of PDFs to Charm++ reductions necessitated using
custom Charm++ reducers, and some help from the Charm++ developers. See the
email thread starting at
http://lists.cs.uiuc.edu/pipermail/charm/2015-July/002106.html, and in
particular, http://lists.cs.uiuc.edu/pipermail/charm/2015-July/002114.html,
which contains a simplified example on how the Charm++ serialization was
implemented in conjunction with a custom reducer on non-POD types. While the
example on the email list by Nikhil shows the basic principle, our
implementation implements this for estimating the histograms for a user-defined
number of PDFs, containing uni-, bi-, and tri-variate, ordinary, central PDFs
and any of their combinations.

A weak scaling study is underway to test the implementation, now with the
estimation of statistics AND PDFs. We use two different series, a small and
large data set. The small uses 0.011642% of the available memory, while the
large uses roughly 75% of the available memory for all runs. Details for the two
series are as follows:

small data (using 0.011642% of memory) 900->1000
    nn(1)      nc(2)     nored(3)     red(4)  red+skippdf(5)  red+pdf(6)
        1         24         0:04       0:19            0:09        0:50
       10        240         1:13       0:26            0:09        1:06
       50       1200         1:23       0:41            0:04        1:13
      100       2400         3:23       1:54            0:04        1:20
      500      12000     canceled   overload            0:06        1:23
     1000      24000 only 4 steps   overload            0:11        1:46

large data (using 75% of memory)
    nn(1)      nc(2)  red+pdf(9->10nopdf)(3)  red+pdf(10->11pdf)(4)
        1         24                    1:46                   8:38
       10        240                    1:54                  14:26
       50       1200                    1:56                  14:28
      100       2400                    1:56                  14:20
      500      12000                    2:00                   9:34
     1000      24000                    2:27                  11:26

The control files, now adding PDF estimation, for the small and large data sets
are as follows:

```
=========== small data set (dir1.q) ==========
title "Dirichlet for the IJSA paper"

walker
  term  140.0   # Max time
  dt    0.05    # Time step size
  npar  1000000   # Number of particles
  ttyi  100    # TTY output interval

  rngs
    mkl_mrg32k3a seed 0 end
  end

  dirichlet     # Select Dirichlet SDE
    depvar y
    init zero
    coeff const
    ncomp 2  # = K = N-1
    b     0.1    1.5 end
    S     0.625  0.4 end
    kappa 0.0125 0.3 end
    rng mkl_mrg32k3a
  end

  statistics
    <Y1>
    <Y2>
    <y1y1>
    <y2y2>
    <y1y2>
  end

  pdfs
    interval 10
    filetype txt
    policy overwrite
    centering elem
    format scientific
    precision 4
     p1_1( Y1 : 1.0e-2 )
     p2_1( Y2 : 1.0e-2 )
     p3_1( y1 y2 : 1.0e-2 1.0e-2 )
     p4_1( y1 y2 y1 : 1.0e-2 1.0e-2 1.0e-2 )
     p5_1( y1 : 1.0e-2 )
     p6_1( y2 : 1.0e-2 )
     p7_1( Y1 Y2 : 1.0e-2 1.0e-2 )
     p8_1( Y1 Y2 y1 : 1.0e-2 1.0e-2 1.0e-2 )
     p9_1( Y1 : 1.0e-2 )
    p10_1( Y2 : 1.0e-2 )
    p11_1( y1 y2 : 1.0e-2 1.0e-2 )
    p12_1( y1 y2 y1 : 1.0e-2 1.0e-2 1.0e-2 )
    p13_1( Y1 : 1.0e-2 )
    p14_1( Y2 : 1.0e-2 )
    p15_1( y1 y2 : 1.0e-2 1.0e-2 )
    p16_1( y1 y2 y1 : 1.0e-2 1.0e-2 1.0e-2 )
  end

end
==============================================
```

```
=========== large data set (dir1.q) ==========
title "Dirichlet for the IJSA paper"

walker
  term  1.0   # Max time
  dt    0.05    # Time step size
  npar  3000000000   # Number of particles
  ttyi  1    # TTY output interval

  rngs
    mkl_mrg32k3a seed 0 end
  end

  dirichlet     # Select Dirichlet SDE
    depvar y
    init zero
    coeff const
    ncomp 2  # = K = N-1
    b     0.1    1.5 end
    S     0.625  0.4 end
    kappa 0.0125 0.3 end
    rng mkl_mrg32k3a
  end

  statistics
    <Y1>
    <Y2>
    <y1y1>
    <y2y2>
    <y1y2>
  end

  pdfs
    interval 10
    filetype txt
    policy overwrite
    centering elem
    format scientific
    precision 4
     p1_1( Y1 : 1.0e-2 )
     p2_1( Y2 : 1.0e-2 )
     p3_1( y1 y2 : 1.0e-2 1.0e-2 )
     p4_1( y1 y2 y1 : 1.0e-2 1.0e-2 1.0e-2 )
     p5_1( y1 : 1.0e-2 )
     p6_1( y2 : 1.0e-2 )
     p7_1( Y1 Y2 : 1.0e-2 1.0e-2 )
     p8_1( Y1 Y2 y1 : 1.0e-2 1.0e-2 1.0e-2 )
     p9_1( Y1 : 1.0e-2 )
    p10_1( Y2 : 1.0e-2 )
    p11_1( y1 y2 : 1.0e-2 1.0e-2 )
    p12_1( y1 y2 y1 : 1.0e-2 1.0e-2 1.0e-2 )
    p13_1( Y1 : 1.0e-2 )
    p14_1( Y2 : 1.0e-2 )
    p15_1( y1 y2 : 1.0e-2 1.0e-2 )
    p16_1( y1 y2 y1 : 1.0e-2 1.0e-2 1.0e-2 )
  end

end
==============================================
```

Discussion:

- The first column in both tables is the number of compute nodes.

- The second column in both tables is the number of CPU cores (24/node).

- The first table corresponds to the small data set and a verbatim copy from
  commit 6aa75ba up to the first 5 columns. The sixth column is the data
  measured with the changes this commit applied. Thus the sixth column
  corresponds to using Charm++ reductions for both statistics and PDF estimation
  with the small data set.

- First table sixth column: it is apparent that the simulation now can finish
  without a problem of overloading the message buffers.

- Comparing the timings of the fifth and the sixth columns in the first table
  shows that estimating PDFs can be heavy, even though PDFs for these runs were
  only estimated at every 10th time step. That is, PDF estimation happened 10
  times between iterations 900 and 1000 whose time was measured and displayed in
  the sixth column.

- Comparing the timings of the fifth and the sixth columns in the first table
  also shows that enabling the estimation of PDFs is not only computationally
  expensive, but also degrades the weak scaling performance a little bit.

- The second table contains the timings for the large data set. The third is
  from iteration 9 to 10, while the fourth column is from iteration 10 to 11.
  From iteration 9 to 10 no PDFs were estimated, while from iteration 10 to 11
  PDFs were estimated.

- Taking out the first row, which does not involve network communication, shows
  that both with and without estimating PDFs the weak scaling is at its
  theoretical maximum if the data set we operate on is large enough.

- There is considerable variability in the last column for the large data set.
  Since the timings decrease with larger core count and larger load, I attribute
  this variability to external effects on the machine. E.g., the last two rows
  were obtained on the next day compared to the rest of the simulations, with
  possibly more favorable network characteristics.
  • Loading branch information...
jbakosi committed Jul 22, 2015
1 parent 3e896df commit d5304e786d63083b33e0c24cdef25431233818f8
@@ -2,7 +2,7 @@
/*!
\file src/Base/PUPUtil.h
\author J. Bakosi
\date Mon 01 Jun 2015 02:10:29 PM MDT
\date Tue 21 Jul 2015 08:44:55 AM MDT
\copyright 2012-2015, Jozsef Bakosi.
\brief Charm++ Pack/UnPack utilities
\brief This file contains some extensions to Charm++'s Pack/UnPack
@@ -40,10 +40,12 @@ namespace PUP {
#pragma GCC diagnostic ignored "-Wuninitialized"
#endif

//! Pack/Unpack enum class. In Charm++ usually both the pup() overload and an
//! overload for operator| are defined for all serializable types. However, we
//! cannot define operator| for enum class as it would conflict with Charm++'s
//! catch-all, template< class T > inline void operator|(PUP::er &p,T &t) {...}.
//! \brief Pack/Unpack enum class.
//! \details In Charm++ usually both the pup() overload and an overload for
//! operator| are defined for all serializable types. However, we cannot
//! define operator| for enum class as it would conflict with Charm++'s
//! catch-all, template< class T > inline void operator|(PUP::er &p,T &t)
//! {...}.
//! \param[in] p Charm++'s pack/unpack object
//! \param[in] e Enum class to pack/unpack
//! \author J. Bakosi
@@ -4,9 +4,10 @@ project(Statistics CXX)

add_library(Statistics
Statistics.C
PDFUtil.C
)

removeWarnings( "Statistics.C" )
removeWarnings( "Statistics.C;PDFUtil.C" )

INSTALL(TARGETS Statistics
RUNTIME DESTINATION bin COMPONENT Runtime
@@ -0,0 +1,116 @@
//******************************************************************************
/*!
\file src/Statistics/PDFUtil.C
\author J. Bakosi
\date Mon 01 Jun 2015 02:52:59 PM MDT
\copyright 2012-2015, Jozsef Bakosi.
\brief PDF utilities
\brief PDF utilities.
*/
//******************************************************************************

#include "PDFUtil.h"
#include "Make_unique.h"

namespace tk {

std::pair< int, std::unique_ptr<char[]> >
serialize( const std::tuple< std::vector< tk::UniPDF >,
std::vector< tk::BiPDF >,
std::vector< tk::TriPDF > >& pdf )
//******************************************************************************
// Serialize vectors of PDFs to raw memory stream
//! \param[in] pdf Tuple of vectors of uni-, bi-, and tri-variate PDFs
//! \return Pair of the length and the raw stream containing the serialized PDFs
//! \author J. Bakosi
//******************************************************************************
{
// Prepare for serializing PDFs to a raw binary stream, compute size
PUP::sizer sizer;
sizer | const_cast< std::vector<tk::UniPDF>& >( std::get<0>(pdf) );
sizer | const_cast< std::vector<tk::BiPDF>& >( std::get<1>(pdf) );
sizer | const_cast< std::vector<tk::TriPDF>& >( std::get<2>(pdf) );

// Create raw character stream to store the serialized PDFs
std::unique_ptr<char[]> flatData = tk::make_unique<char[]>( sizer.size() );

// Serialize PDFs, each message will contain a vector of PDFs
PUP::toMem packer( flatData.get() );
packer | const_cast< std::vector<tk::UniPDF>& >( std::get<0>(pdf) );
packer | const_cast< std::vector<tk::BiPDF>& >( std::get<1>(pdf) );
packer | const_cast< std::vector<tk::TriPDF>& >( std::get<2>(pdf) );

// Return size of and raw stream
return { static_cast< int >( sizer.size() ), std::move(flatData) };
}

std::tuple< std::vector< tk::UniPDF >,
std::vector< tk::BiPDF >,
std::vector< tk::TriPDF > >
merge( CkReductionMsg* msg )
//******************************************************************************
// Deserialize and merge vectors of PDFs from Charm's CkReductionMsg
//! \param[in] msg Charm++ reduction message containing the serialized PDFs
//! \return Vector of merged PDFs
//! \author J. Bakosi
//******************************************************************************
{
// Create PUP deserializer based on message passed in
PUP::fromMem creator( msg->getData() );

// Will store deserialized uni-, bi-, and tri-variate PDFs
std::vector< tk::UniPDF > updf;
std::vector< tk::BiPDF > bpdf;
std::vector< tk::TriPDF > tpdf;

// Deserialize PDFs from raw stream
creator | updf;
creator | bpdf;
creator | tpdf;

// Create tuple to hold all PDFs
std::tuple< std::vector< tk::UniPDF >,
std::vector< tk::BiPDF >,
std::vector< tk::TriPDF > >
res( std::vector< tk::UniPDF >( updf.size() ),
std::vector< tk::BiPDF >( bpdf.size() ),
std::vector< tk::TriPDF >( tpdf.size() ) );

// Merge PDFs into tuple
std::size_t i = 0;
for (const auto& p : updf) std::get<0>(res)[i++].addPDF(p);
i = 0;
for (const auto& p : bpdf) std::get<1>(res)[i++].addPDF(p);
i = 0;
for (const auto& p : tpdf) std::get<2>(res)[i++].addPDF(p);

// Return merged PDFs
return res;
}

CkReductionMsg*
mergePDF( int nmsg, CkReductionMsg **msgs )
//******************************************************************************
// Charm++ custom reducer for merging PDFs during reduction across PEs
//! \param[in] nmsg Number of messages in msgs
//! \param[in] msgs Charm++ reduction message containing the serialized PDFs
//! \return Aggregated PDFs built for further aggregation if needed
//! \author J. Bakosi
//******************************************************************************
{
// Will store merged PDFs in deserialized form
std::tuple< std::vector< tk::UniPDF >,
std::vector< tk::BiPDF >,
std::vector< tk::TriPDF > > pdf;

// Deserialize and merge vector of PDFs with partial sums
for (int i=0; i<nmsg; ++i) pdf = tk::merge( msgs[i] );

// Serialize vector of merged PDFs to raw stream
auto stream = tk::serialize( pdf );

// Forward serialized PDFs
return CkReductionMsg::buildNew( stream.first, stream.second.get() );
}

} // tk::
@@ -0,0 +1,52 @@
//******************************************************************************
/*!
\file src/Statistics/PDFUtil.h
\author J. Bakosi
\date Mon 01 Jun 2015 02:10:29 PM MDT
\copyright 2012-2015, Jozsef Bakosi.
\brief PDF utilities
\brief PDF utilities.
*/
//******************************************************************************
#ifndef PDFUtil_h
#define PDFUtil_h

#include <tuple>
#include <vector>

#if defined(__clang__) || defined(__GNUC__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wconversion"
#endif

#include <charm++.h>

#if defined(__clang__) || defined(__GNUC__)
#pragma GCC diagnostic pop
#endif

#include "UniPDF.h"
#include "BiPDF.h"
#include "TriPDF.h"

namespace tk {

//! Serialize vectors of PDFs to raw memory stream
std::pair< int, std::unique_ptr<char[]> >
serialize( const std::tuple< std::vector< tk::UniPDF >,
std::vector< tk::BiPDF >,
std::vector< tk::TriPDF > >& pdf );

//! Deserialize and merge vectors of PDFs from Charm's CkReductionMsg
std::tuple< std::vector< tk::UniPDF >,
std::vector< tk::BiPDF >,
std::vector< tk::TriPDF > >
merge( CkReductionMsg* msg );

//! Charm++ custom reducer for merging PDFs during reduction across PEs
CkReductionMsg*
mergePDF( int nmsg, CkReductionMsg **msgs );

} // tk::

#endif // PDFUtil_h
@@ -2,7 +2,7 @@
/*!
\file src/Statistics/UniPDF.h
\author J. Bakosi
\date Mon 01 Jun 2015 02:08:28 PM MDT
\date Mon 20 Jul 2015 05:56:53 PM MDT
\copyright 2012-2015, Jozsef Bakosi.
\brief Univariate PDF estimator
\details Univariate PDF estimator. This class can be used to estimate a
@@ -112,6 +112,21 @@ class UniPDF {
map_type m_pdf; //!< Probability density function
};

//! Output univariate PDF to output stream
//! \param[inout] os Stream to output to
//! \param[in] p PDF to output
//! \return Updated stream
//! \note Used for debugging.
//! \author J. Bakosi
static inline
std::ostream& operator<< ( std::ostream& os, const tk::UniPDF& p ) {
os << p.binsize() << ", " << p.nsample() << ": ";
std::map< typename tk::UniPDF::key_type, tk::real >
sorted( p.map().begin(), p.map().end() );
for (const auto& b : sorted) os << '(' << b.first << ',' << b.second << ") ";
return os;
}

} // tk::

#endif // UniPDF_h
@@ -2,7 +2,7 @@
/*!
\file src/Walker/Collector.C
\author J. Bakosi
\date Sat 11 Jul 2015 12:12:12 PM MDT
\date Mon 20 Jul 2015 08:22:17 PM MDT
\copyright 2012-2015, Jozsef Bakosi.
\brief Charm++ module interface file for collecting contributions from
Integrators
@@ -11,6 +11,7 @@
*/
//******************************************************************************

#include "Make_unique.h"
#include "Collector.h"

using walker::Collector;
@@ -75,6 +76,115 @@ Collector::chareCen( const std::vector< tk::real >& cen )
}
}

void
Collector::chareOrdPDF( const std::vector< tk::UniPDF >& updf,
const std::vector< tk::BiPDF >& bpdf,
const std::vector< tk::TriPDF >& tpdf )
//******************************************************************************
// Chares contribute ordinary PDFs
//! \param[in] updf Vector of partial sums for the estimation of univariate
//! ordinary PDFs
//! \param[in] bpdf Vector of partial sums for the estimation of bivariate
//! ordinary PDFs
//! \param[in] tpdf Vector of partial sums for the estimation of trivariate
//! ordinary PDFs
//! \note This function does not have to be declared as a Charm++ entry
//! method since it is always called by chares on the same PE.
//! \author J. Bakosi
//******************************************************************************
{
++m_nopdf;

// Add contribution from worker chares to partial sums on my PE
std::size_t i = 0;
m_ordupdf.resize( updf.size() );
for (const auto& p : updf) m_ordupdf[i++].addPDF( p );

i = 0;
m_ordbpdf.resize( bpdf.size() );
for (const auto& p : bpdf) m_ordbpdf[i++].addPDF( p );

i = 0;
m_ordtpdf.resize( tpdf.size() );
for (const auto& p : tpdf) m_ordtpdf[i++].addPDF( p );

// If all chares on my PE have contributed, send partial sums to host
if (m_nopdf == m_nchare) {

// Create Charm++ callback function for reduction.
// Distributor::estimateOrdPDF() will be the final target of the reduction
// where the results of the reduction will appear.
CkCallback cb( CkIndex_Distributor::estimateOrdPDF(nullptr), m_hostproxy );

// Serialize vector of PDFs to raw stream
auto stream =
tk::serialize( std::make_tuple( m_ordupdf, m_ordbpdf, m_ordtpdf ) );

// Contribute serialized PDFs of partial sums to host via Charm++ reduction
contribute( stream.first, stream.second.get(), PDFMerger, cb );

// Zero counters for next collection operation
m_nopdf = 0;
for (auto& p : m_ordupdf) p.zero();
for (auto& p : m_ordbpdf) p.zero();
for (auto& p : m_ordtpdf) p.zero();
}
}

void
Collector::chareCenPDF( const std::vector< tk::UniPDF >& updf,
const std::vector< tk::BiPDF >& bpdf,
const std::vector< tk::TriPDF >& tpdf )
//******************************************************************************
// Chares contribute central PDFs
//! \param[in] updf Vector of partial sums for the estimation of univariate
//! central PDFs
//! \param[in] bpdf Vector of partial sums for the estimation of bivariate
//! central PDFs
//! \param[in] tpdf Vector of partial sums for the estimation of trivariate
//! central PDFs
//! \note This function does not have to be declared as a Charm++ entry
//! method since it is always called by chares on the same PE.
//! \author J. Bakosi
//******************************************************************************
{
++m_ncpdf;

// Add contribution from worker chares to partial sums on my PE
std::size_t i = 0;
m_cenupdf.resize( updf.size() );
for (const auto& p : updf) m_cenupdf[i++].addPDF( p );

i = 0;
m_cenbpdf.resize( bpdf.size() );
for (const auto& p : bpdf) m_cenbpdf[i++].addPDF( p );

i = 0;
m_centpdf.resize( tpdf.size() );
for (const auto& p : tpdf) m_centpdf[i++].addPDF( p );

// If all chares on my PE have contributed, send partial sums to host
if (m_ncpdf == m_nchare) {

// Create Charm++ callback function for reduction.
// Distributor::estimateCenPDF() will be the final target of the reduction
// where the results of the reduction will appear.
CkCallback cb( CkIndex_Distributor::estimateCenPDF(nullptr), m_hostproxy );

// Serialize vector of PDFs to raw stream
auto stream =
tk::serialize( std::make_tuple( m_cenupdf, m_cenbpdf, m_centpdf ) );

// Contribute serialized PDFs of partial sums to host via Charm++ reduction
contribute( stream.first, stream.second.get(), PDFMerger, cb );

// Zero counters for next collection operation
m_ncpdf = 0;
for (auto& p : m_cenupdf) p.zero();
for (auto& p : m_cenbpdf) p.zero();
for (auto& p : m_centpdf) p.zero();
}
}
#if defined(__clang__) || defined(__GNUC__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wconversion"
Oops, something went wrong.

0 comments on commit d5304e7

Please sign in to comment.