In [None]:
mode = "pandas"  # RESET THE KERNEL if you change this

# Initialisation

In [None]:
%reload_ext autoreload
%autoreload 2
import os
# Select the mode in `pandas`, `cudf`, `dask` or `dask-cudf`.
os.environ["VDF_MODE"] = mode  # RESET THE KERNEL if you change this
os.environ["DASK_SCHEDULER_SERVICE_HOST"]="localhost"

#import cardif_dask as vdf  # Import Virtual Dataframe
import virtual_dataframe as vdf
import pandas as pd
import numpy as np

print(f"Use {vdf.VDF_MODE.name.upper()}")

In [None]:
client = vdf.VClient()
client

# Object Creation

Creating a `VSeries`

In [None]:
s = vdf.VSeries([1,2,3,None,4],npartitions=2)
s.compute()

Creating a `VDataFrame` by specifying values for each column.

In [None]:
df = vdf.VDataFrame({'a': list(range(20)),
                 'b': list(reversed(range(20))),
                 'c': list(range(20))
                }, npartitions=2)
df.compute()

Creating a `VDataFrame` from a pandas `Dataframe`.

> Note that best practice for `VDataFrame` is to read data directly into a ̀`VDataFrame` with something like `read_csv()` (discussed below).

In [None]:
pdf = pd.DataFrame({'a': [0, 1, 2, 3],'b': [0.1, 0.2, 0.3, 0.4]})
vdf.from_pandas(pdf, npartitions=2).compute()

In [None]:
ps = pd.Series([1,2,3,None,4])
vdf.from_pandas(ps, npartitions=2).compute()

# Viewing Data

Viewing the top rows of a `VDataFrame`.

In [None]:
df.head(2)

In [None]:
df.sort_values(by='b').compute()

# Selection

## Getting
Selecting a single column, which initially yields a `VSeries`.

In [None]:
df['a'].compute()

## Selection by Label
Selecting rows from index 2 to index 5 from columns ‘a’ and ‘b’.

In [None]:
df.loc[2:5, ['a', 'b']]

In [None]:
df.loc[2:5, ['a', 'b']].compute()

## Selection by Position
Selecting via integers and integer slices, like numpy/pandas.
> Note that this functionality is not available for `dask-cudf`.

In [None]:
df.iloc[:,0:len(df.columns)].compute()

In [None]:
s[3:5].compute()

## Boolean Indexing
Selecting rows in a `VDataFrame` or `VSeries` by direct `Boolean` indexing.

In [None]:
df[df.b > 15].compute()

Selecting values from a `DataFrame` where a `Boolean` condition is met, via the query API.

You can pass local variables to queries, via the `local_dict` keyword.
Supported logical operators include >, <, >=, <=, ==, and !=.

In [None]:
value = 3
df.query("b == @value",local_dict={'value':value}).compute()

In [None]:
value = 3
df.query("b == @val", local_dict={'val':value}).compute()

Using the isin method for filtering.

In [None]:
df[df.a.isin([0, 5])].compute()

# Missing Data

Missing data can be replaced by using the fillna method.

In [None]:
s.fillna(999).compute()

# Operations

## Stats
Calculating descriptive statistics for a Series.

In [None]:
vdf.compute(s.mean())[0], vdf.compute(s.var())[0]

## Apply

Applying functions to a Series.
Note that applying user defined functions directly with Dask-cuDF is not yet implemented.
For now, you can use map_partitions to apply a function to each partition of the distributed dataframe.

In [None]:
def add_ten(num:np.int64):
    return num + 10

df['a'].apply(add_ten).compute()

In [None]:
df.compute()

In [None]:
df['a'].map_partitions(add_ten).compute()

In [None]:
df.applymap(lambda x: x*2).compute()

In [None]:
def my_kernel(a_s, b_s, c_s, val, outs):
    for i, (a, b, c) in enumerate(zip(a_s,b_s,c_s)):
        outs[i] = (a + b + c ) * val

df.apply_rows(
    my_kernel,
    incols={'a':'a_s','b':'b_s','c':'c_s'},
    outcols={'outs': np.float64},
    kwargs={"val":3.0}
).compute()

## Histogramming

Counting the number of occurrences of each unique value of variable.

In [None]:
df.a.value_counts().compute()

## String Methods

Virtual Dataframe provides string processing methods in the str attribute of Series.

In [None]:
s = vdf.VSeries(['A', 'B', 'C', 'Aaba', 'Baca', None, 'CABA', 'dog', 'cat'], npartitions=2)
s.str.lower().compute()

# Concat

Concatenating VSeries and VDataFrames row-wise.

In [None]:
s = vdf.VSeries([1, 2, 3, None, 5],npartitions=2)
vdf.concat([s, s]).compute()

# Join

Performing SQL style merges.
Note that the dataframe order is not maintained, but may be restored post-merge by sorting by the index.

In [None]:
df_a = vdf.VDataFrame({"key":['a', 'b', 'c', 'd', 'e'], "vals_a":[float(i + 10) for i in range(5)]})

df_b = vdf.VDataFrame({"key":['a', 'c', 'e'],"vals_b":[float(i+100) for i in range(3)]})

merged = df_a.merge(df_b, on=['key'], how='left')
merged.compute()

# Grouping

Virtual Dataframe support the Split-Apply-Combine groupby paradigm.

In [None]:
df = vdf.VDataFrame({'a': list(range(20)),
                     'b': list(reversed(range(20))),
                     'c': list(range(20))
                    })

df['agg_col1'] = pd.Series([1 if x % 2 == 0 else 0 for x in range(len(df))])
df['agg_col2'] = pd.Series([1 if x % 3 == 0 else 0 for x in range(len(df))])

In [None]:
ddf = vdf.from_backend(df.compute(), npartitions=2)
ddf.compute()

Grouping and then applying the sum function to the grouped data.

In [None]:
df.groupby('agg_col1').sum().compute()

Grouping hierarchically then applying the sum function to grouped data.

In [None]:
df.groupby(['agg_col1', 'agg_col2']).sum().compute()

Grouping and applying statistical functions to specific columns, using agg.

In [None]:
df.groupby('agg_col1').agg({'a':'max', 'b':'mean', 'c':'sum'}).compute()

# Categoricals

`VDataFrames` support categorical columns.

In [None]:
import virtual_dataframe as vdf
# String Arrays is not yet implemented in dask_cudf
#gdf = vdf.VDataFrame({"id": [1, 2, 3, 4, 5, 6], "grade":['a', 'b', 'b', 'a', 'a', 'e']}, npartitions=2)
gdf = vdf.VDataFrame({"id": [1, 2, 3, 4, 5, 6], "grade":[101, 102, 103, 104, 105, 106]}, npartitions=2)
gdf['grade'] = gdf['grade'].astype('category')
if vdf.VDF_MODE in (vdf.Mode.dask,vdf.Mode.dask_cudf):
    gdf['grade'] = gdf.grade.cat.as_known()
gdf.dtypes

Accessing the categories of a column.

> Note that this is currently not supported in `dask-cudf`.

Accessing the underlying code values of each categorical observation.

In [None]:
gdf.categorize().compute()

In [None]:
gdf.grade.cat.codes.compute()

# Converting Data Representation

## Pandas
Converting a `VDataFrame` to a pandas DataFrame.

In [None]:
df.head().to_pandas()

## Numpy
Converting a `VDataFrame` to a numpy ndarray.

In [None]:
df.to_numpy()

Converting a `VSeries` to a numpy ndarray.

In [None]:
df['a'].to_numpy()

# Delayed

In [None]:
TestDF = vdf.VDataFrame

@vdf.delayed
def add_one(data: TestDF) -> TestDF:
    return data + 1


add_one(df).compute()

# Getting Data In/Out

## CSV
Writing to a CSV file.

In [None]:
import shutil
if os.path.exists('example_output'):
    shutil.rmtree('example_output')
os.mkdir('example_output')
#df.to_csv('example_output/foo.csv', single_file=True)
df.to_csv('example_output/foo*.csv')

In [None]:
!ls example_output

Reading from a csv file.

In [None]:
df = vdf.read_csv('example_output/foo*.csv')
df.compute()

Reading all CSV files in a directory into a single dask_cudf.DataFrame, using the star wildcard.
> Not implemented in pandas or cudf ?

## Parquet
Writing to parquet files, using the CPU via PyArrow.

In [None]:
df.to_parquet('example_output/temp_parquet')
