Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT] Add mars.learn.metrics.multilabel_confusion_matrix and derivative metrics (#2554) #2568

Merged
merged 2 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@ Please review https://github.com/mars-project/mars/blob/master/CONTRIBUTING.rst
## Related issue number

<!-- Are there any issues opened that will be resolved by merging this change? -->
Fixes #xxxx

## Check code requirements

- [ ] tests added / passed (if needed)
- [ ] Ensure all linting tests pass, see [here](https://docs.pymars.org/en/latest/development/contributing.html#check-code-styles) for how to run them
2 changes: 2 additions & 0 deletions docs/source/development/contributing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ We also require relative import in code for all Mars modules. Use

to check if your code meet the requirement.

.. _build_documentation:

Building Documentations
-----------------------
Mars uses ``sphinx`` to build documents. You need to install necessary packages
Expand Down
1 change: 1 addition & 0 deletions docs/source/development/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ Development

contributing
overview
operand
oscar/index
services/index
253 changes: 253 additions & 0 deletions docs/source/development/operand.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
.. _operand_implementation:

Implement a Mars Operand
========================

Use ``read_csv`` as an example to illustrate how to implement a Mars operand.

Define Operand Class
--------------------

All Mars operands inherit from the base class ``Operand``, it defines the
basic properties of operand, each module has it's own child class, such as
``DataFrameOperand``, ``TensorOperand``, etc. For tilebale operand, it also
needs to inherit from ``TileableOperandMixin`` to implement ``tile`` and ``execute``
functions. So we firstly define operand class and set output types in init method,
the types could be DataFrame or Tensor which depends on the type of operand's output data,
``__call__`` method is also needed for creating a Mars dataframe.

.. code-block:: python

# NOTE: Use relative import if in Mars modules
from mars.dataframe.operands import DataFrameOperand, DataFrameOperandMixin
from mars.core import OutputType
from mars.serialization.serializables import StringField


class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
path = StringField('path')

def __init__(self, **kw):
super().__init__(**kw)
self._output_types=[OutputType.dataframe]

def __call__(self, index_value=None, columns_value=None,
dtypes=None, chunk_bytes=None):
# np.nan means its size is unknown on axis 0
shape = (np.nan, len(dtypes))
return self.new_dataframe(
None,
shape,
dtypes=dtypes,
index_value=index_value,
columns_value=columns_value,
chunk_bytes=chunk_bytes,
)

For the ``SimpleReadCSV`` operand, the property ``path`` means the path of csv file,
we use a ``StringField`` to indicate the property's type which is useful for serialization.
If the type is uncertain, ``AnyField`` will work.

Implement Tile Method
---------------------

Tile method is the next goal, this method will split the computing task into
several sub tasks. Ideally, these tasks can be assigned on different executors
in parallel. In the specific case of ``read_csv``, each sub task read a block of bytes
from the file, so we need calculate the offset and length of each block in the
tile function. As we use the same class for both coarse-grained and fine-grained operand,
``offset``, ``length`` and other properties are added to record information for
fine-grained operand.

.. code-block:: python

import os

import numpy as np

from mars.core import OutputType
from mars.dataframe.operands import DataFrameOperand, DataFrameOperandMixin
from mars.serialization.serializables import AnyField, StringField, Int64Field
from mars.utils import parse_readable_size


class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
path = StringField("path")
chunk_bytes = AnyFiled('chunk_bytes')
offset = Int64Field("offset")
length = Int64Field("length")

def __init__(self, **kw):
super().__init__(**kw)
self._output_types=[OutputType.dataframe]

@classmethod
def tile(cls, op: "SimpleReadCSV"):
# out is operand's output in coarse-grained graph
out = op.outputs[0]

file_path = op.path
file_size = os.path.getsize(file_path)

# split file into chunks
chunk_bytes = int(parse_readable_size(op.chunk_bytes)[0])
offset = 0
index_num = 0
out_chunks = []
while offset < file_size:
chunk_op = op.copy().reset_key()
chunk_op.path = file_path
# offset and length for current chunk
chunk_op.offset = offset
chunk_op.length = min(chunk_bytes, file_size - offset)
# calculate chunk's meta, including shape, index_value, columns_value
# here we use np.nan to represent unknown shape
shape = (np.nan, len(out.dtypes))
# use `op.new_chunk` to create a dataframe chunk
new_chunk = chunk_op.new_chunk(
None,
shape=shape,
index=(index_num, 0),
index_value=out.index_value,
columns_value=out.columns_value,
dtypes=out.dtypes,
)
offset += chunk_bytes
index_num += 1
out_chunks.append(new_chunk)

# create a new tileable which holds `chunks` for generating fine-grained graph
new_op = op.copy()
# `nsplits` records the split info for each axis. For read_csv,
# the output dataframe is split into multiple parts on axis 0 and
# keep one chunk on axis 1, so the nsplits will be
# like ((np.nan, np.nan, ...), (out.shape[1],))
nsplits = ((np.nan,) * len(out_chunks), (out.shape[1],))
return new_op.new_dataframes(
None,
out.shape,
dtypes=out.dtypes,
index_value=out.index_value,
columns_value=out.columns_value,
chunks=out_chunks,
nsplits=nsplits,
)


Implement Execute Method
------------------------

When sub task is delivered to executor, Mars will call operand's execute method to
perform calculations. When it comes to ``read_csv``, we need read the block from the file
according to the ``offset`` and ``length``, however the ``offset`` is a rough position as
we can't read a csv file from the middle of a line, using ``readline`` to find the starts
and ends at delimiter boundaries.

.. code-block:: python

from io import BytesIO

import pandas as pd

from mars.dataframe.utils import build_empty_df


def _find_chunk_start_end(f, offset, size):
f.seek(offset)
if f.tell() == 0:
start = 0
else:
f.readline()
start = f.tell()
f.seek(offset + size)
f.readline()
end = f.tell()
return start, end


class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
@classmethod
def execute(cls, ctx: Union[dict, Context], op: "SimpleReadCSV"):
out = op.outputs[0]
with open(op.path, 'rb') as f:
start, end = _find_chunk_start_end(f, op.offset, op.length)
if end == start:
# the last chunk may be empty
data = build_empty_df(out.dtypes)
else:
f.seek(start)
if start == 0:
# The first chunk contains header, skip header rows
data = pd.read_csv(BytesIO(f.read(end - start)),
skiprows=1,
names=out.dtypes.index)
else:
data = pd.read_csv(BytesIO(f.read(end - start)),
names=out.dtypes.index)

ctx[out.key] = data

After reading the chunk data by ``pd.read_csv``, we store the results in ``ctx``.
``SimpleReadCSV`` only has one output here, for operand like ``SVD`` that has multiple
outputs, we can store them separately using output's keys.

Define User Interface
---------------------

Finally, we need define function ``read_csv`` exposed to users. In this function, besides
creating a ``SimpleReadCSV`` operand, a sample data is taken to infer some meta information
of Mars DataFrame, such as dtypes, columns, index, etc.

.. code-block:: python

from mars.dataframe.utils import parse_index

def read_csv(file_path, chunk_bytes='16M'):
# use first 10 lines to infer
with open(file_path, 'rb') as f:
head_lines = b"".join([f.readline() for _ in range(10)])

mini_df = pd.read_csv(BytesIO(head_lines))
index_value = parse_index(mini_df.index)
columns_value = parse_index(mini_df.columns, store_data=True)
dtypes = mini_df.dtypes
op = SimpleReadCSV(path=file_path, chunk_bytes=chunk_bytes)
return op(
index_value=index_value,
columns_value=columns_value,
dtypes=dtypes,
chunk_bytes=chunk_bytes,
)

Write Tests
-----------

Mars uses pytest for testing, we can add tests under the ``tests`` subdirectory
of the specific module and follow the current examples of tests. Define a test
function and use the shared fixture ``setup`` to run your tests under the test
environment.

.. code-block:: python

def test_simple_read_csv_execution(setup):
with tempfile.TemporaryDirectory() as tempdir:
file_path = os.path.join(tempdir, "test.csv")
# write to a csv file
raw = pd.DataFrame({
'int': range(10),
'float': np.random.rand(10),
'str': [f'value_{i}' for i in range(10)]
}).to_csv(file_path, index=False)
mdf = read_csv(file_path).execute().fetch()
pd.testing.assert_frame_equal(raw, mdf)

When tests pass locally, we can submit a pull requests on GitHub, the test suite
will run automatically on GitHub Actions and Azure Pipelines continuous integration
services, if all checks have passed, it means the pull request is up to the quality
of merging.

Documenting Your Code
---------------------

If the changes add APIs to Mars modules, we should document our code in ``docs``
directory, it can be done following the regarding :ref:`documentation <build_documentation>`.
Loading