In [None]:
mode = "pandas"

# Initialisation

# Object Creation

Creating a `VSeries`

In [2]:
s = vdf.VSeries([1,2,3,None,4],npartitions=2)
s.compute()

0       1
1       2
2       3
3    <NA>
4       4
dtype: int64

Creating a `VDataFrame` by specifying values for each column.

In [3]:
df = vdf.VDataFrame({'a': list(range(20)),
                 'b': list(reversed(range(20))),
                 'c': list(range(20))
                }, npartitions=2)
df.compute()

Unnamed: 0,a,b,c
0,0,19,0
1,1,18,1
2,2,17,2
3,3,16,3
4,4,15,4
5,5,14,5
6,6,13,6
7,7,12,7
8,8,11,8
9,9,10,9


Creating a `VDataFrame` from a pandas `Dataframe`.

> Note that best practice for `VDataFrame` is to read data directly into a ̀`VDataFrame` with something like `read_csv()` (discussed below).

In [4]:
pdf = pd.DataFrame({'a': [0, 1, 2, 3],'b': [0.1, 0.2, 0.3, 0.4]})
vdf.from_pandas(pdf, npartitions=2).compute()

Unnamed: 0,a,b
0,0,0.1
1,1,0.2
2,2,0.3
3,3,0.4


In [5]:
ps = pd.Series([1,2,3,None,4])
vdf.from_pandas(ps, npartitions=2).compute()

0    1.0
1    2.0
2    3.0
3    NaN
4    4.0
dtype: float64

# Viewing Data

Viewing the top rows of a `VDataFrame`.

In [6]:
df.head(2)

Unnamed: 0,a,b,c
0,0,19,0
1,1,18,1


In [7]:
df.sort_values(by='b').compute()

Unnamed: 0,a,b,c
19,19,0,19
18,18,1,18
17,17,2,17
16,16,3,16
15,15,4,15
14,14,5,14
13,13,6,13
12,12,7,12
11,11,8,11
10,10,9,10


# Selection

## Getting
Selecting a single column, which initially yields a `VSeries`.

In [8]:
df['a'].compute()

0      0
1      1
2      2
3      3
4      4
5      5
6      6
7      7
8      8
9      9
10    10
11    11
12    12
13    13
14    14
15    15
16    16
17    17
18    18
19    19
Name: a, dtype: int64

## Selection by Label
Selecting rows from index 2 to index 5 from columns ‘a’ and ‘b’.

In [9]:
df.loc[2:5, ['a', 'b']]

Unnamed: 0_level_0,a,b
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
2,int64,int64
5,...,...


In [10]:
df.loc[2:5, ['a', 'b']].compute()

Unnamed: 0,a,b
2,2,17
3,3,16
4,4,15
5,5,14


## Selection by Position
Selecting via integers and integer slices, like numpy/pandas.
> Note that this functionality is not available for `dask-cudf`.

In [11]:
df.iloc[:,0:len(df.columns)].compute()

Unnamed: 0,a,b,c
0,0,19,0
1,1,18,1
2,2,17,2
3,3,16,3
4,4,15,4
5,5,14,5
6,6,13,6
7,7,12,7
8,8,11,8
9,9,10,9


In [12]:
s[3:5].compute()

3    <NA>
4       4
dtype: int64

## Boolean Indexing
Selecting rows in a `VDataFrame` or `VSeries` by direct `Boolean` indexing.

In [13]:
df[df.b > 15].compute()

Unnamed: 0,a,b,c
0,0,19,0
1,1,18,1
2,2,17,2
3,3,16,3


Selecting values from a `DataFrame` where a `Boolean` condition is met, via the query API.

You can pass local variables to queries, via the `local_dict` keyword.
Supported logical operators include >, <, >=, <=, ==, and !=.

In [14]:
value = 3
df.query("b == @value",local_dict={'value':value}).compute()

Unnamed: 0,a,b,c
16,16,3,16


In [15]:
value = 3
df.query("b == @val", local_dict={'val':value}).compute()

Unnamed: 0,a,b,c
16,16,3,16


Using the isin method for filtering.

In [16]:
df[df.a.isin([0, 5])].compute()

Unnamed: 0,a,b,c
0,0,19,0
5,5,14,5


# Missing Data

Missing data can be replaced by using the fillna method.

In [17]:
s.fillna(999).compute()

0      1
1      2
2      3
3    999
4      4
dtype: int64

# Operations

## Stats
Calculating descriptive statistics for a Series.

In [18]:
vdf.compute(s.mean())[0], vdf.compute(s.var())[0]

(2.5, 1.6666666666666667)

## Applymap

Applying functions to a Series.
Note that applying user defined functions directly with Dask-cuDF is not yet implemented.
For now, you can use map_partitions to apply a function to each partition of the distributed dataframe.

In [19]:
def add_ten(num:np.int64):
    return num + 10

#df['a'].apply(add_ten,meta=('a', 'int64')).compute()
df['a'].apply(add_ten).compute()

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('a', 'int64'))



0     10
1     11
2     12
3     13
4     14
5     15
6     16
7     17
8     18
9     19
10    20
11    21
12    22
13    23
14    24
15    25
16    26
17    27
18    28
19    29
Name: a, dtype: int64

In [20]:
# Not with pandas
if vdf.VDF_MODE in (vdf.Mode.dask, vdf.Mode.dask_cudf):
    df['a'].map_partitions(add_ten).compute()

## Histogramming

Counting the number of occurrences of each unique value of variable.

In [21]:
df.a.value_counts().compute()

15    1
6     1
1     1
14    1
2     1
5     1
11    1
7     1
17    1
13    1
8     1
16    1
0     1
10    1
4     1
9     1
19    1
18    1
3     1
12    1
Name: a, dtype: int64

## String Methods

Virtual Dataframe provides string processing methods in the str attribute of Series.

In [22]:
s = vdf.VSeries(['A', 'B', 'C', 'Aaba', 'Baca', None, 'CABA', 'dog', 'cat'], npartitions=2)
s.str.lower().compute()

0       a
1       b
2       c
3    aaba
4    baca
5    <NA>
6    caba
7     dog
8     cat
dtype: object

# Concat

Concatenating VSeries and VDataFrames row-wise.

In [23]:
s = vdf.VSeries([1, 2, 3, None, 5],npartitions=2)
vdf.concat([s, s]).compute()

0       1
1       2
2       3
3    <NA>
4       5
0       1
1       2
2       3
3    <NA>
4       5
dtype: int64

# Join

Performing SQL style merges.
Note that the dataframe order is not maintained, but may be restored post-merge by sorting by the index.

In [24]:
df_a = vdf.VDataFrame({"key":['a', 'b', 'c', 'd', 'e'], "vals_a":[float(i + 10) for i in range(5)]})

df_b = vdf.VDataFrame({"key":['a', 'c', 'e'],"vals_b":[float(i+100) for i in range(3)]})

merged = df_a.merge(df_b, on=['key'], how='left')
merged.compute()

Unnamed: 0,key,vals_a,vals_b
0,a,10.0,100.0
1,e,14.0,102.0
2,c,12.0,101.0
3,b,11.0,
4,d,13.0,


# Grouping

Virtual Dataframe support the Split-Apply-Combine groupby paradigm.

In [25]:
import cudf
import dask_cudf
df = vdf.VDataFrame({'a': list(range(20)),
                     'b': list(reversed(range(20))),
                     'c': list(range(20))
                    })

df['agg_col1'] = pd.Series([1 if x % 2 == 0 else 0 for x in range(len(df))])
df['agg_col2'] = pd.Series([1 if x % 3 == 0 else 0 for x in range(len(df))])

In [26]:
ddf = vdf.from_virtual(df.compute(), npartitions=2)
ddf.compute()

Unnamed: 0,a,b,c,agg_col1,agg_col2
0,0,19,0,1,1
1,1,18,1,0,0
2,2,17,2,1,0
3,3,16,3,0,1
4,4,15,4,1,0
5,5,14,5,0,0
6,6,13,6,1,1
7,7,12,7,0,0
8,8,11,8,1,0
9,9,10,9,0,1


Grouping and then applying the sum function to the grouped data.

In [27]:
df.groupby('agg_col1').sum().compute()

Unnamed: 0_level_0,a,b,c,agg_col2
agg_col1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,90,100,90,4
0,100,90,100,3


Grouping hierarchically then applying the sum function to grouped data.

In [28]:
df.groupby(['agg_col1', 'agg_col2']).sum().compute()

Unnamed: 0_level_0,Unnamed: 1_level_0,a,b,c
agg_col1,agg_col2,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,0,54,60,54
0,0,73,60,73
1,1,36,40,36
0,1,27,30,27


Grouping and applying statistical functions to specific columns, using agg.

In [29]:
df.groupby('agg_col1').agg({'a':'max', 'b':'mean', 'c':'sum'}).compute()

Unnamed: 0_level_0,a,b,c
agg_col1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,18,10.0,90
0,19,9.0,100


# Categoricals

`VDataFrames` support categorical columns.

In [30]:
import virtual_dataframe as vdf
# String Arrays is not yet implemented in dask_cudf
#gdf = vdf.VDataFrame({"id": [1, 2, 3, 4, 5, 6], "grade":['a', 'b', 'b', 'a', 'a', 'e']}, npartitions=2)
gdf = vdf.VDataFrame({"id": [1, 2, 3, 4, 5, 6], "grade":[101, 102, 103, 104, 105, 106]}, npartitions=2)
gdf['grade'] = gdf['grade'].astype('category')
if vdf.VDF_MODE in (vdf.Mode.dask,vdf.Mode.dask_cudf):
    gdf['grade'] = gdf.grade.cat.as_known()
gdf.dtypes

id          int64
grade    category
dtype: object

Accessing the categories of a column.

> Note that this is currently not supported in `dask-cudf`.

Accessing the underlying code values of each categorical observation.

In [31]:
gdf.categorize().compute()

Unnamed: 0,id,grade
0,1,101
1,2,102
2,3,103
3,4,104
4,5,105
5,6,106


In [32]:
gdf.grade.cat.codes.compute()

0    0
1    1
2    2
3    3
4    4
5    5
dtype: uint8

# Converting Data Representation

## Pandas
Converting a `VDataFrame` to a pandas DataFrame.

In [33]:
df.head().to_pandas()

Unnamed: 0,a,b,c,agg_col1,agg_col2
0,0,19,0,1,1
1,1,18,1,0,0
2,2,17,2,1,0
3,3,16,3,0,1
4,4,15,4,1,0


## Numpy
Converting a `VDataFrame` to a numpy ndarray.

In [34]:
df.to_numpy()

array([[ 0, 19,  0,  1,  1],
       [ 1, 18,  1,  0,  0],
       [ 2, 17,  2,  1,  0],
       [ 3, 16,  3,  0,  1],
       [ 4, 15,  4,  1,  0],
       [ 5, 14,  5,  0,  0],
       [ 6, 13,  6,  1,  1],
       [ 7, 12,  7,  0,  0],
       [ 8, 11,  8,  1,  0],
       [ 9, 10,  9,  0,  1],
       [10,  9, 10,  1,  0],
       [11,  8, 11,  0,  0],
       [12,  7, 12,  1,  1],
       [13,  6, 13,  0,  0],
       [14,  5, 14,  1,  0],
       [15,  4, 15,  0,  1],
       [16,  3, 16,  1,  0],
       [17,  2, 17,  0,  0],
       [18,  1, 18,  1,  1],
       [19,  0, 19,  0,  0]])

Converting a `VSeries` to a numpy ndarray.

In [35]:
df['a'].to_numpy()

array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19])

# Getting Data In/Out

## CSV
Writing to a CSV file.

In [36]:
import shutil
if os.path.exists('example_output'):
    shutil.rmtree('example_output')
os.mkdir('example_output')
#df.to_csv('example_output/foo.csv', single_file=True)
df.to_csv('example_output/foo*.csv')

['/home/pprados/workspace.bda/virtual_dataframe/notebooks/example_output/foo0.csv']

In [37]:
!ls example_output

foo0.csv


Reading from a csv file.

In [38]:
type(df)

dask_cudf.core.DataFrame

In [39]:
df = vdf.read_csv('example_output/foo*.csv')
df.compute()

Unnamed: 0.1,Unnamed: 0,a,b,c,agg_col1,agg_col2
0,0,0,19,0,1,1
1,1,1,18,1,0,0
2,2,2,17,2,1,0
3,3,3,16,3,0,1
4,4,4,15,4,1,0
5,5,5,14,5,0,0
6,6,6,13,6,1,1
7,7,7,12,7,0,0
8,8,8,11,8,1,0
9,9,9,10,9,0,1


In [40]:
type(df)

dask.dataframe.core.DataFrame

Reading all CSV files in a directory into a single dask_cudf.DataFrame, using the star wildcard.
> Not implemented in pandas or cudf ?

In [41]:
# FIXME: extends pandas
if vdf.VDF_MODE in (vdf.Mode.dask, vdf.Mode.dask_cudf):
    df = vdf.read_csv('example_output/*.csv')
    df.compute()

## Parquet
Writing to parquet files, using the CPU via PyArrow.

In [42]:
df.to_parquet('example_output/temp_parquet')