Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RDataFrame integration #588

Closed
jpivarski opened this issue Dec 10, 2020 · 11 comments
Closed

RDataFrame integration #588

jpivarski opened this issue Dec 10, 2020 · 11 comments
Assignees
Labels
feature New feature or request

Comments

@jpivarski
Copy link
Member

This issue is to collect my thoughts about how RDataFrame integration could be done. Such a thing would be useful because physicists could then mix analyses using Awkward Array, Numba, and ROOT C++ without leaving their environment. The benefits compound:

  1. Data that are too complex to read from Uproot (efficiently or at all) can be loaded using MakeRootDataFrame and dumped into an Awkward Array.
  2. Arbitrarily complex Awkward Arrays can be written to ROOT files by dumping the Awkward Arrays into an RDataFrame and taking a Snapshot.
  3. Users can use ROOT C++ functions in an otherwise Awkward analysis at full speed. ("Full" in quotes; there is a conversion penalty, but it's compiled code, not so bad.)

Should we use Arrow? No.

In principle, we should be able to convert Awkward Arrays to and from RDataFrame using Apache Arrow, but there only seems to be an RArrowDS and not an action that converts back to Arrow (like AsNumpy), and even the RArrowDS only forwards a reference to arrow::Table—trying to include the Arrow headers from my conda-installed version of ROOT fails. Perhaps the version of ROOT in conda-forge is not linked to Arrow. If Arrow access is not consistent across ROOT installations (i.e. it's a compile-time option or something), I don't want to rely on it—it will fail for too many users.

Besides, I didn't see an example of RArrowDS for anything but primitive types: if a jagged array is passed from Arrow to RDataFrame, how does it appear to a physicist writing code for a Define action? Like strange arrow:: stuff? Taking this and the above consideration together, I don't think Arrow is the right route for Awkward → RDataFrame, despite first appearances.

Code generation to Awkward → RDataFrame

But code-generation could be a good way to go. Each distinct (different Form) attempt to create an RDataFrame from an Awkward Array (probably named ak.to_rdataframe) could try to import ROOT (no explicit dependence), create a string of C++ code, and run it through ROOT.gInterpreter.Define (with unique names, to permit repeated definitions). Any nested records could be declared as new structs so that nested fields can be accessed as record.fieldName (as long as field names are in [A-Za-z_][A-Za-z_0-9]*). Any nested lists could be emplaced into ROOT::VecOps::RVec. There's a performance penalty for converting columnar data into record-oriented data like this, but most physics use-cases don't run at such high speeds that this would be the bottleneck.

The code to generate would look a lot like this (shamelessly stolen from RTrivialDS with "Trivial" changed to "Wonky"):

#include "ROOT/RDF/RInterface.hxx"
#include "ROOT/RDataSource.hxx"
#include <ROOT/RDF/Utils.hxx>
#include <ROOT/TSeq.hxx>
#include <ROOT/RMakeUnique.hxx>
#include <TError.h>

#include <limits>

namespace ROOT {

namespace RDF {

class RWonkyDS final : public ROOT::RDF::RDataSource {
private:
   unsigned int fNSlots = 0U;
   ULong64_t fSize = 0ULL;
   bool fSkipEvenEntries = false;
   std::vector<std::pair<ULong64_t, ULong64_t>> fEntryRanges;
   std::vector<std::string> fColNames{"col0"};
   std::vector<ULong64_t> fCounter;
   std::vector<ULong64_t *> fCounterAddr;
   std::vector<void *> GetColumnReadersImpl(std::string_view name, const std::type_info &);

protected:
   std::string AsString() { return "trivial data source"; };

public:
   RWonkyDS(ULong64_t size, bool skipEvenEntries = false);
   /// This ctor produces a data-source that returns infinite entries
   RWonkyDS();
   ~RWonkyDS();
   const std::vector<std::string> &GetColumnNames() const;
   bool HasColumn(std::string_view colName) const;
   std::string GetTypeName(std::string_view) const;
   std::vector<std::pair<ULong64_t, ULong64_t>> GetEntryRanges();
   bool SetEntry(unsigned int slot, ULong64_t entry);
   void SetNSlots(unsigned int nSlots);
   void Initialise();
   std::string GetLabel();
};

// Make a RDF wrapping a RWonkyDS with the specified amount of entries
RInterface<RDFDetail::RLoopManager, RWonkyDS> MakeWonkyDataFrame(ULong64_t size, bool skipEvenEntries = false);
// Make a RDF wrapping a RWonkyDS with infinite entries
RInterface<RDFDetail::RLoopManager, RWonkyDS> MakeWonkyDataFrame();

std::vector<void *> RWonkyDS::GetColumnReadersImpl(std::string_view, const std::type_info &ti)
{
   // We know we have only one column and that it's holding ULong64_t's
   if (ti != typeid(ULong64_t)) {
      throw std::runtime_error("The type specified for the column \"col0\" is not ULong64_t.");
   }
   std::vector<void *> ret;
   for (auto i : ROOT::TSeqU(fNSlots)) {
      fCounterAddr[i] = &fCounter[i];
      ret.emplace_back((void *)(&fCounterAddr[i]));
   }
   return ret;
}

RWonkyDS::RWonkyDS(ULong64_t size, bool skipEvenEntries) : fSize(size), fSkipEvenEntries(skipEvenEntries)
{
}

RWonkyDS::RWonkyDS() : fSize(std::numeric_limits<ULong64_t>::max()), fSkipEvenEntries(false)
{
}

RWonkyDS::~RWonkyDS()
{
}

const std::vector<std::string> &RWonkyDS::GetColumnNames() const
{
   return fColNames;
}

bool RWonkyDS::HasColumn(std::string_view colName) const
{
   return colName == fColNames[0];
}

std::string RWonkyDS::GetTypeName(std::string_view) const
{
   return "ULong64_t";
}

std::vector<std::pair<ULong64_t, ULong64_t>> RWonkyDS::GetEntryRanges()
{
   if (fSize == std::numeric_limits<ULong64_t>::max()) {
      auto currentEntry = *std::max_element(fCounter.begin(), fCounter.end());
      // infinite source, just make some ranges up
      std::vector<std::pair<ULong64_t, ULong64_t>> ranges(fNSlots);
      for (auto &range : ranges) {
         range = std::make_pair(currentEntry, currentEntry + 10);
         currentEntry += 10;
      }
      return ranges;
   }

   // empty fEntryRanges so we'll return an empty vector on subsequent calls
   auto ranges = std::move(fEntryRanges);
   return ranges;
}

bool RWonkyDS::SetEntry(unsigned int slot, ULong64_t entry)
{
   if (fSkipEvenEntries && 0 == entry % 2) {
      return false;
   }
   fCounter[slot] = entry;
   return true;
}

void RWonkyDS::SetNSlots(unsigned int nSlots)
{
   R__ASSERT(0U == fNSlots && "Setting the number of slots even if the number of slots is different from zero.");

   fNSlots = nSlots;
   fCounter.resize(fNSlots);
   fCounterAddr.resize(fNSlots);
}

void RWonkyDS::Initialise()
{
   if (fSize == std::numeric_limits<ULong64_t>::max()) {
      // infinite source, nothing to do here
      return;
   }

   // initialize fEntryRanges
   const auto chunkSize = fSize / fNSlots;
   auto start = 0UL;
   auto end = 0UL;
   for (auto i : ROOT::TSeqUL(fNSlots)) {
      start = end;
      end += chunkSize;
      fEntryRanges.emplace_back(start, end);
      (void)i;
   }
   // TODO: redistribute reminder to all slots
   fEntryRanges.back().second += fSize % fNSlots;
}

std::string RWonkyDS::GetLabel()
{
   return "WonkyDS";
}

RInterface<RDFDetail::RLoopManager, RWonkyDS> MakeWonkyDataFrame(ULong64_t size, bool skipEvenEntries)
{
   auto lm = std::make_unique<RDFDetail::RLoopManager>(std::make_unique<RWonkyDS>(size, skipEvenEntries),
                                                       RDFInternal::ColumnNames_t{});
   return RInterface<RDFDetail::RLoopManager, RWonkyDS>(std::move(lm));
}

RInterface<RDFDetail::RLoopManager, RWonkyDS> MakeWonkyDataFrame()
{
   auto lm = std::make_unique<RDFDetail::RLoopManager>(std::make_unique<RWonkyDS>(), RDFInternal::ColumnNames_t{});
   return RInterface<RDFDetail::RLoopManager, RWonkyDS>(std::move(lm));
}

} // ns RDF

} // ns ROOT

void testy() {
  auto printEntrySlot = [](ULong64_t iEntry, ULong64_t slot) {
      std::cout << "Entry: " << iEntry << " Slot: " << slot << std::endl;
  };
  auto d_s = ROOT::RDF::MakeWonkyDataFrame(10);
  d_s.Foreach(printEntrySlot, {"rdfentry_", "col0" });
}

RDataFrame → Awkward

For the other direction, creating an Awkward Array from RDataFrame, it would perhaps be most natural to import the ArrayBuilder functions into the ROOT C++ context through function pointers, exactly as they have been imported into the Numba context. They could then be used by users in an RDataFrame Foreach action—which is decidedly non-functional, but that's how it's done in Numba—or we could monkey-patch an AsAwkward action onto RInterface that generate the arrays of the Awkward layout as columns and provide them to Python through AsNumpy. It can be exactly the format required by ak.from_arrayset.

It seems that monkey-patching is indeed possible:

>>> import ROOT
>>> ROOT.RDF.RInterface("ROOT::Detail::RDF::RLoopManager", "void").whatever = lambda self: "HELLO " + repr(self)
>>> 
>>> rdf = ROOT.RDF.MakeRootDataFrame("events", "/home/jpivarski/miniconda3/lib/python3.8/site-packages/skhep_testdata/data/uproot-HZZ-objects.root")
>>> tmp = rdf.Define("x", "eventweight * 2")
>>> 
>>> rdf.whatever()
'HELLO <cppyy.gbl.ROOT.RDataFrame object at 0x5575a14f19a0>'
>>> tmp.whatever()
'HELLO <cppyy.gbl.ROOT.RDF.RInterface<ROOT::Detail::RDF::RLoopManager,void> object at 0x5575a168a750>'

Even for objects made before the monkey-patching:

>>> import ROOT
>>> rdf = ROOT.RDF.MakeRootDataFrame("events", "/home/jpivarski/miniconda3/lib/python3.8/site-packages/skhep_testdata/data/uproot-HZZ-objects.root")
>>> tmp = rdf.Define("x", "eventweight * 2")
>>> 
>>> ROOT.RDF.RInterface("ROOT::Detail::RDF::RLoopManager", "void").whatever = lambda self: "HELLO " + repr(self)
>>> 
>>> rdf.whatever()
'HELLO <cppyy.gbl.ROOT.RDataFrame object at 0x55d8c4cd91b0>'
>>> tmp.whatever()
'HELLO <cppyy.gbl.ROOT.RDF.RInterface<ROOT::Detail::RDF::RLoopManager,void> object at 0x55d8c6063660>'

So this allows us to fully decouple installation requirements (ROOT does not depend on Awkward Array and Awkward Array does not depend on ROOT) while allowing us to add an AsAwkward action to every RInterface in PyROOT. The downside is that it has to be installed by some Awkward import, such as

>>> import awkward.pyroot   # or maybe awkward.ROOT?

Without this import, the RDataFrame nodes would not have an AsAwkward action. Numba avoids this by checking for entry points in setuptools, but doing that here would require a change to ROOT. If this project goes well and people find it useful, then adding an entry points mechanism to PyROOT would be better motivated.

People who might be interested

@eguiraud and @etejedor, probably! The above project is speculative at this point and no one has asked for it. I just got to thinking that such an interface should exist, since it would multiply the possibilities available to physicists doing analysis.

@jpivarski jpivarski added the feature New feature or request label Dec 10, 2020
@jpivarski jpivarski added this to Idea in Long-term projects Dec 10, 2020
@jpivarski jpivarski moved this from Python ideas to C++ ideas in Long-term projects Dec 15, 2020
@jpivarski jpivarski moved this from C++ ideas to Python ideas in Long-term projects Dec 15, 2020
@clementhelsens
Copy link

I add myself here, as discussed over email to help with this project.

@eguiraud
Copy link

Argh, never replied, sorry! A RDataSource to feed awkward arrays into RDF is the correct thing to use, and to output awkward arrays a custom action can be added to RDF using the Book method (and a nice alias can be monkey-patched via Python).

@jpivarski
Copy link
Member Author

I didn't know about Book, and it does look like the right way to get a set of columns out. If it can be made to return a dict of arrays in Python, I can follow up with ak.from_buffers. I wasn't thinking that AsNumpy would be right because these arrays would have to be a different view of the columns than the user interface actions (such as Define) see: the user should see std::vectors and structs but ak.from_buffers should get one-dimensional arrays of offsets and content (and some metadata that can be used to make the ak.forms.Form that Awkward Array needs to build the output array.

The last action would also need to be non-lazy, so I guess I'll need something after Book to trigger execution.

@eguiraud
Copy link

Book per se is a C++ thing, it cannot return Python dicts, but I guess the idea is that Book(AwkwardArrayMaker{}, ...) would inject C++ logic into RDataFrame's event loop (giving you access to multi-threading, laziness, etc) and return a C++ object, and then the monkey-patched Python method can "decorate" that Book invocation to make it more Pythonic and, if you want, to make it eager, e.g. df.AsAwkward("x") would execute something like (I'm simplifying a bit):

def AsAwkward(dataframe, column):
  awkArrMaker = AwkwardArrayMaker()
  res = df.Book(awkArrMaker, column).GetValue() # calling GetValue right away makes this eager
  # res is now some C++ struct exposed to Python via PyROOT
  # you can use it from Python or invoke a helper C++ function that converts C++ arrays into awkward arrays, or whatever you want

@jpivarski
Copy link
Member Author

That is helpful, thanks! I will want to do a part of the translation on the C++ side, in order to make C++ native objects like std::vectors and structs that would be the user's interface when they're writing Define, Filter, etc. From a global view, this is how I see the conversion:

  1. Awkward Array goes through ak.to_buffers in Python to break it down into 1-dimensional arrays of contents and indexes (such as "offsets" for lists).
  2. A particular array generates custom C++ code (the code-generation would be in Python) to turn those 1-dimensional arrays into the appropriate C++ objects, such as std::vectors and structs, with whatever nesting is appropriate for the ak.Array's data type.
  3. Users write Define, Filter, etc., operating exclusively on C++ objects.
  4. The output C++ objects get converted into a set of new 1-dimensional arrays, again using C++ code generated from the Python side. This would likely be a Book.
  5. These 1-dimensional arrays are gathered into a dict and passed into ak.from_buffers on the Python side, probably in a monkey-patched RDataFrame method named AsAwkward (for symmetry with AsNumpy), to turn the output into Awkward Arrays.

Steps 1‒2 are independent of steps 4‒5. Someone might do one, the other, or both.

There's something I hadn't considered about step 4, though: I need to inspect the C++ output types of a node in the RDataFrame DAG from Python. The reflection information must be available for RDataFrame to work, but is it publicly accessible? For instance, given a Python object representing a node in an RDataFrame (ROOT.RDF.RInterface?), is it possible to get the names of defined columns and their types? How are their types represented (ROOT.TClass? if so, what about numbers and STL collections)? That's what I'd need to know to generate the appropriate C++ when an AsAwkward node is added to an RInterface.

@eguiraud
Copy link

RDF has a GetColumnType("x") method that returns the type of column "x" as a std::string. Collections take the type RVec<T>, where RVec is the numpy-like vector type that RDF users use to manipulate collections.

GetColumnType is not fast, but if that ever becomes the bottleneck (I doubt it) there are options.

@jpivarski
Copy link
Member Author

A string is probably best, anyway. I can parse templates to get all the STL structures, and if I encounter any opaque class names, I can look them up with ROOT.TClass.GetClass to get their fields. There might be type aliases (such as Size_t), but that can be a lookup table that grows as we encounter more cases (what Uproot does).

As for it not being fast, the type lookup, code generation, and Cling compilation are all once-per-array operations, rather than once-per-element, and I generally only worry about the performance of the latter. (I.e. we want the scaling to have a good slope, but are less concerned with the offset: defining the goal to optimize infinitely large arrays.) Since RDF also does a one-time compilation step, this isn't very different from what you do. If I make ak.forms.Form hashable, to put in a global dict, it can be a once-per-type-of-array operation: repeatedly applying arrays of the same type could reuse previously generated and compiled interfaces.

Oh, and I should make the interface to collections be RVec instead of std::vector? I did see that AsNumpy makes RVecs of PyROOT objects when the columns are not numeric types. If RVec is the preferred user interface for collections in RDF, I'll do that. (I see now: RVec enhances std::vector with np.take- and np.compress-like slicing.)

@NJManganelli
Copy link

If I may step in and generate a bit of noise, I'd support this on principle alone. I think many analyses are moving forward in frameworks being built on uproot+awkward or RDataFrame, and I agree having interoperability will be a big boon for analyses and analyzers. As I'm careening towards thesis writing and graduation, I'm not sure I have much bandwidth for meaningful contributions, but I'll be watching (and that may perhaps change by the time this gets attention from whoever will work on it).

I can say that I wish this existed right now, I would definitely have uses for converting from my nearly-complete analysis based on RDF, to awkward, and back again. Interfacing with industry standard tools and already trained models for ML and round-tripping back to root output, without introducing even more bottle-necking intermediate stages, falling back to single-threaded execution, or rewriting things for the complete flattening I'd need using AsNumpy/MakeNumpyDataFrame, would be fantastic.

Which raises a question from me, Jim, do you envision this as a bulk operation like AsNumpy, where all columns/rows are held in memory at once, or something that can operate on chunks at a time? That may be something for much later, once a working implementation can take an entire dataset from one to the other and back.

@jpivarski
Copy link
Member Author

Good to know there's interest!

If it's implemented as a bulk operation, we can get the chunk-at-a-time functionality from that. The natural implementation for partitioned arrays (ak.partitioned) would be to step through the partitions, running the RDF bridge for each partition at a time. If the partitioned array is also virtual (i.e. it's a lazy array) and the RDF actions fill histograms or write to ROOT files (i.e. do not generate growing in-memory output), then such a process would never have more than one chunk of data in memory (for a suitably chosen lazy cache). That statement has a lot of qualifiers on it, but some examples of "best practices" could indicate how to do it within a memory budget.

@ianna
Copy link
Collaborator

ianna commented Jun 23, 2022

FYI, both ak._v2.to_rdataframe and ak._v2.from_rdataframe are now in a pre-release. You can pick it up with

pip install --pre awkward

Here is an example how to use it:

def test_data_frame_vec_of_real():
    ak_array_in = ak._v2.Array([[1.1, 2.2], [3.3], [4.4, 5.5]])

    data_frame = ak._v2.to_rdataframe({"x": ak_array_in})

    assert data_frame.GetColumnType("x") == "ROOT::VecOps::RVec<double>"

    ak_array_out = ak._v2.from_rdataframe(
        data_frame,
        column="x",
    )
    assert ak_array_in.to_list() == ak_array_out["x"].to_list()

Please, let me know if there are any issues, or requests. Thanks!

@ianna
Copy link
Collaborator

ianna commented Aug 31, 2022

Completed in #1625. A short tutorial is here.

@ianna ianna closed this as completed Aug 31, 2022
Prioritized issues automation moved this from In progress to Done Aug 31, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or request
Projects
No open projects
Development

No branches or pull requests

5 participants