Skip to content

Commit

Permalink
Reformatting with yapf (#58)
Browse files Browse the repository at this point in the history
* Fixing formatting, adding travis test for yapf

* Fix bug

* Fix rebase
  • Loading branch information
devin-petersohn authored and simon-mo committed Jul 25, 2018
1 parent f7fda88 commit 51ce86f
Show file tree
Hide file tree
Showing 19 changed files with 2,684 additions and 1,563 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ matrix:
env: LINT=1
script:
- export PATH="$HOME/miniconda/bin:$PATH"
- yapf -dr modin/pandas
- flake8 .

install:
Expand Down
2 changes: 1 addition & 1 deletion .travis/install-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ elif [[ "$LINT" == "1" ]]; then
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
# Install Python linting tools.
pip install -q flake8 flake8-comprehensions
pip install -q flake8 flake8-comprehensions yapf
else
echo "Unrecognized environment."
exit 1
Expand Down
29 changes: 17 additions & 12 deletions modin/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

try:
if threading.current_thread().name == "MainThread":
ray.init(redirect_output=True, include_webui=False,
redirect_worker_output=True)
ray.init(
redirect_output=True,
include_webui=False,
redirect_worker_output=True)
except AssertionError:
pass

Expand All @@ -41,17 +43,20 @@ def get_npartitions():
from .concat import concat # noqa: 402
from .dataframe import DataFrame # noqa: 402
from .datetimes import to_datetime # noqa: 402
from .io import (read_csv, read_parquet, read_json, read_html, # noqa: 402
read_clipboard, read_excel, read_hdf, read_feather, # noqa: 402
read_msgpack, read_stata, read_sas, read_pickle, # noqa: 402
read_sql) # noqa: 402
from .io import ( # noqa: 402
read_csv, read_parquet, read_json, read_html, read_clipboard, read_excel,
read_hdf, read_feather, read_msgpack, read_stata, read_sas, read_pickle,
read_sql)
from .reshape import get_dummies # noqa: 402

__all__ = [
"DataFrame", "Series", "read_csv", "read_parquet", "concat", "eval",
"unique", "value_counts", "cut", "to_numeric", "factorize", "test", "qcut",
"match", "to_datetime", "get_dummies", "Panel", "date_range", "Index",
"MultiIndex", "Series", "bdate_range", "DatetimeIndex", "to_timedelta",
"set_eng_float_format", "set_option", "CategoricalIndex", "Timedelta",
"Timestamp", "NaT", "PeriodIndex", "Categorical"
"DataFrame", "Series", "read_csv", "read_parquet", "read_json",
"read_html", "read_clipboard", "read_excel", "read_hdf", "read_feather",
"read_msgpack", "read_stata", "read_sas", "read_pickle", "read_sql",
"concat", "eval", "unique", "value_counts", "cut", "to_numeric",
"factorize", "test", "qcut", "match", "to_datetime", "get_dummies",
"Panel", "date_range", "Index", "MultiIndex", "Series", "bdate_range",
"DatetimeIndex", "to_timedelta", "set_eng_float_format", "set_option",
"CategoricalIndex", "Timedelta", "Timestamp", "NaT", "PeriodIndex",
"Categorical"
]
100 changes: 59 additions & 41 deletions modin/pandas/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@
from .utils import _reindex_helper


def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False,
keys=None, levels=None, names=None, verify_integrity=False,
def concat(objs,
axis=0,
join='outer',
join_axes=None,
ignore_index=False,
keys=None,
levels=None,
names=None,
verify_integrity=False,
copy=True):

if keys is not None:
Expand All @@ -28,24 +35,24 @@ def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False,
raise ValueError("All objects passed were None")

try:
type_check = next(obj for obj in objs
if not isinstance(obj, (pandas.Series,
pandas.DataFrame,
DataFrame)))
type_check = next(
obj for obj in objs
if not isinstance(obj, (pandas.Series, pandas.DataFrame,
DataFrame)))
except StopIteration:
type_check = None
if type_check is not None:
raise ValueError("cannot concatenate object of type \"{0}\"; only "
"pandas.Series, pandas.DataFrame, "
"and modin.pandas.DataFrame objs are "
"valid", type(type_check))
raise ValueError(
"cannot concatenate object of type \"{0}\"; only "
"pandas.Series, pandas.DataFrame, "
"and modin.pandas.DataFrame objs are "
"valid", type(type_check))

all_series = all(isinstance(obj, pandas.Series)
for obj in objs)
all_series = all(isinstance(obj, pandas.Series) for obj in objs)
if all_series:
return DataFrame(pandas.concat(objs, axis, join, join_axes,
ignore_index, keys, levels, names,
verify_integrity, copy))
return DataFrame(
pandas.concat(objs, axis, join, join_axes, ignore_index, keys,
levels, names, verify_integrity, copy))

if isinstance(objs, dict):
raise NotImplementedError(
Expand All @@ -59,8 +66,8 @@ def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False,
" other axis")

# We need this in a list because we use it later.
all_index, all_columns = list(zip(*[(obj.index, obj.columns)
for obj in objs]))
all_index, all_columns = list(
zip(*[(obj.index, obj.columns) for obj in objs]))

def series_to_df(series, columns):
df = pandas.DataFrame(series)
Expand All @@ -71,8 +78,10 @@ def series_to_df(series, columns):
# true regardless of the existence of another column named 0 in the
# concat.
if axis == 0:
objs = [series_to_df(obj, [0])
if isinstance(obj, pandas.Series) else obj for obj in objs]
objs = [
series_to_df(obj, [0]) if isinstance(obj, pandas.Series) else obj
for obj in objs
]
else:
# Pandas starts the count at 0 so this will increment the names as
# long as there's a new nameless Series being added.
Expand All @@ -82,9 +91,11 @@ def name_incrementer(i):
return val

i = [0]
objs = [series_to_df(obj, obj.name if obj.name is not None
else name_incrementer(i))
if isinstance(obj, pandas.Series) else obj for obj in objs]
objs = [
series_to_df(
obj, obj.name if obj.name is not None else name_incrementer(i))
if isinstance(obj, pandas.Series) else obj for obj in objs
]

# Using concat on the columns and index is fast because they're empty,
# and it forces the error checking. It also puts the columns in the
Expand All @@ -105,31 +116,38 @@ def name_incrementer(i):

# Put all of the DataFrames into Ray format
# TODO just partition the DataFrames instead of building a new Ray DF.
objs = [DataFrame(obj) if isinstance(obj, (pandas.DataFrame,
pandas.Series)) else obj
for obj in objs]
objs = [
DataFrame(obj)
if isinstance(obj, (pandas.DataFrame, pandas.Series)) else obj
for obj in objs
]

# Here we reuse all_columns/index so we don't have to materialize objects
# from remote memory built in the previous line. In the future, we won't be
# building new DataFrames, rather just partitioning the DataFrames.
if axis == 0:
new_blocks = np.array([_reindex_helper._submit(
args=tuple([all_columns[i], final_columns, axis,
len(objs[0]._block_partitions)] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions))
for i in range(len(objs))
for part in objs[i]._block_partitions])
new_blocks = np.array([
_reindex_helper._submit(
args=tuple([
all_columns[i], final_columns, axis,
len(objs[0]._block_partitions)
] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions))
for i in range(len(objs)) for part in objs[i]._block_partitions
])
else:
# Transposing the columns is necessary because the remote task treats
# everything like rows and returns in row-major format. Luckily, this
# operation is cheap in numpy.
new_blocks = np.array([_reindex_helper._submit(
args=tuple([all_index[i], final_index, axis,
len(objs[0]._block_partitions.T)] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions.T))
for i in range(len(objs))
for part in objs[i]._block_partitions.T]).T

return DataFrame(block_partitions=new_blocks,
columns=final_columns,
index=final_index)
new_blocks = np.array([
_reindex_helper._submit(
args=tuple([
all_index[i], final_index, axis,
len(objs[0]._block_partitions.T)
] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions.T))
for i in range(len(objs)) for part in objs[i]._block_partitions.T
]).T

return DataFrame(
block_partitions=new_blocks, columns=final_columns, index=final_index)

0 comments on commit 51ce86f

Please sign in to comment.