In [1]:
import io
import os
import pytest

# import dask
from dask.dataframe.io.sql import read_sql_table
from dask.utils import tmpdir, tmpfile
from dask.dataframe.utils import assert_eq

pd = pytest.importorskip("pandas")
dd = pytest.importorskip("dask.dataframe")
pytest.importorskip("sqlalchemy")
pytest.importorskip("sqlite3")
np = pytest.importorskip("numpy")


data = """
name,number,age,negish
Alice,0,33,-5
Bob,1,40,-3
Chris,2,22,3
Dora,3,16,5
Edith,4,53,0
Francis,5,30,0
Garreth,6,20,0
"""

df = pd.read_csv(io.StringIO(data), index_col="number")
df

Unnamed: 0_level_0,name,age,negish
number,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,Alice,33,-5
1,Bob,40,-3
2,Chris,22,3
3,Dora,16,5
4,Edith,53,0
5,Francis,30,0
6,Garreth,20,0


In [3]:
npartitions = 2
ddf = dd.from_pandas(df, npartitions)
ddf

Unnamed: 0_level_0,name,age,negish
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,object,int64,int64
4,...,...,...
6,...,...,...


In [5]:
from pathlib import Path
path = Path('/Users/ryan/c/dask/tmp/sqlite')
uri = f'sqlite:///{path}'

In [50]:
if path.exists():
    path.unlink()

ddf.set_index('age').to_sql("test", uri,)

(None, None)

In [33]:
if path.exists():
    path.unlink()

ddf.set_index('name').to_sql("test", uri,)

(None,)

In [44]:
actual = read_sql_table('test', uri, 'age').compute()
df2 = df.set_index('age')
assert_eq(df2, actual)

AssertionError: DataFrame are different

DataFrame shape mismatch
[left]:  (7, 2)
[right]: (14, 3)

In [6]:
ddf.to_sql("test", uri, index=False, if_exists='replace')

(None, None)

In [84]:
def digits(n, b, empty_zero=False, significant_leading_zeros=False):
    if significant_leading_zeros:
        if not empty_zero: return digits(n+1, b, empty_zero=True, significant_leading_zeros=significant_leading_zeros)
        bases = [1]
        while n >= bases[-1]:
            n -= bases[-1]
            bases.append(bases[-1]*b)
        n_digits = digits(n, b, empty_zero=True)
        return [0]*(len(bases)-1-len(n_digits)) + n_digits
    else:
        return digits(n // b, b, empty_zero=True, significant_leading_zeros=False) + [n%b] if n else [] if empty_zero else [0]

In [88]:
N=41
B=2
#
([ ''.join([chr(ord('0')+digit) for digit in digits(n,B, empty_zero=False, significant_leading_zeros=True)]) for n in range(0,N) ],
 [ ''.join([chr(ord('0')+digit) for digit in digits(n,B, empty_zero= True, significant_leading_zeros=True)]) for n in range(0,N) ])

(['0',
  '1',
  '00',
  '01',
  '10',
  '11',
  '000',
  '001',
  '010',
  '011',
  '100',
  '101',
  '110',
  '111',
  '0000',
  '0001',
  '0010',
  '0011',
  '0100',
  '0101',
  '0110',
  '0111',
  '1000',
  '1001',
  '1010',
  '1011',
  '1100',
  '1101',
  '1110',
  '1111',
  '00000',
  '00001',
  '00010',
  '00011',
  '00100',
  '00101',
  '00110',
  '00111',
  '01000',
  '01001',
  '01010'],
 ['',
  '0',
  '1',
  '00',
  '01',
  '10',
  '11',
  '000',
  '001',
  '010',
  '011',
  '100',
  '101',
  '110',
  '111',
  '0000',
  '0001',
  '0010',
  '0011',
  '0100',
  '0101',
  '0110',
  '0111',
  '1000',
  '1001',
  '1010',
  '1011',
  '1100',
  '1101',
  '1110',
  '1111',
  '00000',
  '00001',
  '00010',
  '00011',
  '00100',
  '00101',
  '00110',
  '00111',
  '01000',
  '01001'])

In [89]:
from zarr import tree
from h5py import File

ModuleNotFoundError: No module named 'zarr'

In [10]:
ddf

Unnamed: 0_level_0,name,age,negish
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,object,int64,int64
4,...,...,...
6,...,...,...


In [7]:
read_sql_table('test', uri, 'age').compute()

Unnamed: 0_level_0,number,name,negish
age,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
33,0,Alice,-5
40,1,Bob,-3
22,2,Chris,3
16,3,Dora,5
53,4,Edith,0
30,5,Francis,0
20,6,Garreth,0


In [9]:
read_sql_table('test', uri, 'name').compute()

TypeError: Provided index column is of type "object".  If divisions is not provided the index column type must be numeric or datetime.

In [54]:
ddf.to_delayed()[0]

Delayed(('from_pandas-654c3723d9258bd5c3f8857aa93e1eca', 0))

In [61]:
from math import ceil

In [63]:
nrows

7

In [64]:
npartitions = 10

In [65]:
nrows = len(df)
chunksize = int(ceil(nrows / npartitions))
chunksize

1

In [66]:
df.index.is_monotonic_increasing

True

In [1]:
from dask.dataframe.io.io import sorted_division_locations

In [80]:
divisions, locations = sorted_division_locations(df.index, chunksize=1)
divisions, locations

([0, 1, 2, 3, 4, 5, 6], [0, 1, 2, 3, 4, 5, 7])

In [85]:
dd.from_pandas(df.iloc[:2], npartitions=2)

Unnamed: 0_level_0,name,age,negish
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,object,int64,int64
1,...,...,...


In [89]:
sorted_division_locations(['a']*5 + ['b']*5, npartitions=10)

(['a', 'b', 'b'], [0, 5, 10])

In [3]:
from pandas import DataFrame as DF

In [98]:
df = DF([ {'i': chr(ord('a')+i), 'n': i } for i in range(11) ]).set_index('i')
sorted_division_locations(df.index, chunksize=2)

(['a', 'c', 'e', 'g', 'i', 'k'], [0, 2, 4, 6, 8, 11])

In [5]:
df = DF([ {'i': chr(ord('a')+i), 'n': i } for i in range(1) ]).set_index('i')
sorted_division_locations(df.index, npartitions=2)

(['a'], [1])

In [87]:
[ 
    sorted_division_locations(
        [
            chr(ord('a')+i) 
            for i in range(n)
        ], 
        chunksize=1
    ) 
    for n in range(2, 10) 
]

[(['a', 'b'], [0, 2]),
 (['a', 'b', 'c'], [0, 1, 3]),
 (['a', 'b', 'c', 'd'], [0, 1, 2, 4]),
 (['a', 'b', 'c', 'd', 'e'], [0, 1, 2, 3, 5]),
 (['a', 'b', 'c', 'd', 'e', 'f'], [0, 1, 2, 3, 4, 6]),
 (['a', 'b', 'c', 'd', 'e', 'f', 'g'], [0, 1, 2, 3, 4, 5, 7]),
 (['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'], [0, 1, 2, 3, 4, 5, 6, 8]),
 (['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'], [0, 1, 2, 3, 4, 5, 6, 7, 9])]

In [88]:
[ 
    sorted_division_locations(
        [
            chr(ord('a')+i) 
            for i in range(n)
        ], 
        chunksize=2
    ) 
    for n in range(2, 10) 
]

[(['a', 'b'], [0, 2]),
 (['a', 'c'], [0, 3]),
 (['a', 'c', 'd'], [0, 2, 4]),
 (['a', 'c', 'e'], [0, 2, 5]),
 (['a', 'c', 'e', 'f'], [0, 2, 4, 6]),
 (['a', 'c', 'e', 'g'], [0, 2, 4, 7]),
 (['a', 'c', 'e', 'g', 'h'], [0, 2, 4, 6, 8]),
 (['a', 'c', 'e', 'g', 'i'], [0, 2, 4, 6, 9])]

In [71]:
data = df

In [73]:
dsk = {
        ('foo', i): data.iloc[start:stop]
        for i, (start, stop) in enumerate(zip(locations[:-1], locations[1:]))
    }

In [79]:
dsk[('foo',0)]

Unnamed: 0_level_0,name,age,negish
number,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,Alice,33,-5
1,Bob,40,-3
2,Chris,22,3
3,Dora,16,5
4,Edith,53,0
5,Francis,30,0
6,Garreth,20,0


In [55]:
ddf = dd.from_pandas(df, 10); ddf

Unnamed: 0_level_0,name,age,negish
npartitions=6,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,object,int64,int64
1,...,...,...
...,...,...,...
5,...,...,...
6,...,...,...


In [59]:
df

Unnamed: 0_level_0,name,age,negish
number,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,Alice,33,-5
1,Bob,40,-3
2,Chris,22,3
3,Dora,16,5
4,Edith,53,0
5,Francis,30,0
6,Garreth,20,0


In [58]:
[ len(d.compute()) for d in ddf.to_delayed() ]

[1, 1, 1, 1, 1, 2]

In [35]:
actual

Unnamed: 0_level_0,name,negish
age,Unnamed: 1_level_1,Unnamed: 2_level_1
33,Alice,-5
40,Bob,-3
22,Chris,3
16,Dora,5
53,Edith,0
30,Francis,0
20,Garreth,0


In [27]:
actual

Unnamed: 0_level_0,name,negish
age,Unnamed: 1_level_1,Unnamed: 2_level_1
16,Dora,5
20,Garreth,0
22,Chris,3
30,Francis,0
33,Alice,-5
40,Bob,-3
53,Edith,0


In [29]:
df.set_index('age')

Unnamed: 0_level_0,name,negish
age,Unnamed: 1_level_1,Unnamed: 2_level_1
33,Alice,-5
40,Bob,-3
22,Chris,3
16,Dora,5
53,Edith,0
30,Francis,0
20,Garreth,0


In [25]:
df2

Unnamed: 0_level_0,name,negish
age,Unnamed: 1_level_1,Unnamed: 2_level_1
33,Alice,-5
40,Bob,-3
22,Chris,3
16,Dora,5
53,Edith,0
30,Francis,0
20,Garreth,0
