Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

append error #31

Open
flamby opened this issue Nov 5, 2019 · 5 comments
Open

append error #31

flamby opened this issue Nov 5, 2019 · 5 comments

Comments

@flamby
Copy link

flamby commented Nov 5, 2019

Hello,

Have you any recommendations regarding importing data from arctic?
I'm currently using cryptostore with arctic as a backend.
Cryptostore is by the very same author of arctic, but loading trades as a dataframe takes too much time with it.

For now, this is what I did :

import pystore
from arctic import Arctic

exchange = "BITFINEX"
datastore = "mydatastore"

arctic_store = Arctic("localhost")
arctic_lib = arctic_store[exchange]
symbols = arctic_lib.list_symbols()

store = pystore.store(datastore)
collection = store.collection(exchange)
for symbol in symbols:
    df_src = arctic_lib.read(symbol)
    if symbol in collection.list_items():
        item = collection.item(symbol)
        df_dst = item.to_pandas()
        # https://stackoverflow.com/a/44318806
        df_diff = df_src[~df_src.index.isin(df_dst.index)]
        rows, columns = df_diff.shape
        if df_diff.empty:
            print("No new row to append...")
        else:
            print(f"Appending {rows} rows to {symbol} item")
            collection.append(symbol, df_diff)
    else:
        rows, columns = df_src.shape
        print(f"Importing {symbol} for the first time w/ {rows} rows and {columns} columns")
        collection.write(symbol, df_src, metadata={'source': 'cryptostore'})

But I'm facing errors similar to #16 - even if rollbacking dask and fastparquet to previous releases - when append is happening.

    raise ValueError("Exactly one of npartitions and chunksize must be specified.")
ValueError: Exactly one of npartitions and chunksize must be specified.

my setup :

dask==2.6.0
fastparquet==0.3.2
numba==0.46.0

Thanks, and keep the good work!

@flamby flamby changed the title Question: Importing data from arctic (via cryptostore) guideline append error Nov 6, 2019
@flamby
Copy link
Author

flamby commented Nov 6, 2019

It seems one has to retrieve npartitions from original dask dataframe, and pass it to append.
So I fixed it this way:

collection.append(symbol, df_diff, npartitions=item.data.npartitions)

Will it work everytime?

@viveksethu
Copy link

viveksethu commented Dec 7, 2019

thank you @flamby

this fix is working and thanks for sharing and saving time for others.

the pystore notebook demo too works only with this fix, else throws an error:

ValueError: Exactly one of npartitions and chunksize must be specified.

great thanks to @ranaroussi for this wonderful library

@XBKZ
Copy link

XBKZ commented Dec 15, 2019

Thank you to @ranaroussi for this nices libraries and thank you to @flamby who fix this nasty bug in the Windows 10 environment ! I had exactly the same message ("Exactly one of npartitions and chunksize must be specified") and the append was impossible. Now, it's work. Thank you again.

@yohplala
Copy link

yohplala commented Jan 9, 2020

Hello, same here (Win10 environment)!
Thanks for the fix @flamby !

@JugglingNumbers
Copy link

JugglingNumbers commented Feb 20, 2020

The problem is that dd.from_pandas() checks:
if (npartitions is None) == (chunksize is None): raise ValueError("Exactly one of npartitions and chunksize must be specified.")

So when the append function calls dd.from_pandas(df, npartitions = None) it raises the error but if you call dd.from_pandas(df, npartitions = None, chunksize=100000) it works. Presumably dask is using npartitions = 1 as its default even though the api says npartitions is optional and doesn't list a default.

The code below is what needs to be tweaked. The new variable could be set to use npartitions = 1 (new = dd.from_pandas(data, npartitions=1), since this will be superseded by the passed value after the dataframes are combined. I'm willing to bet Ran comes up with a more elegant solution though.

# combine old dataframe with new
current = self.item(item)
new = dd.from_pandas(data, npartitions=npartitions)
# combined = current.data.append(new)
combined = dd.concat([current.data, new]).drop_duplicates(keep="last")
if npartitions is None:
memusage = combined.memory_usage(deep=True).sum()
if isinstance(combined, dd.DataFrame):
memusage = memusage.compute()
npartitions = int(1 + memusage // DEFAULT_PARTITION_SIZE)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants