In [1]:
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.array as da
import dask.bag as db

### create a pandas data frame 

In [2]:
index = pd.date_range("2021-09-01", periods=2400, freq="1H")

In [3]:
index

DatetimeIndex(['2021-09-01 00:00:00', '2021-09-01 01:00:00',
               '2021-09-01 02:00:00', '2021-09-01 03:00:00',
               '2021-09-01 04:00:00', '2021-09-01 05:00:00',
               '2021-09-01 06:00:00', '2021-09-01 07:00:00',
               '2021-09-01 08:00:00', '2021-09-01 09:00:00',
               ...
               '2021-12-09 14:00:00', '2021-12-09 15:00:00',
               '2021-12-09 16:00:00', '2021-12-09 17:00:00',
               '2021-12-09 18:00:00', '2021-12-09 19:00:00',
               '2021-12-09 20:00:00', '2021-12-09 21:00:00',
               '2021-12-09 22:00:00', '2021-12-09 23:00:00'],
              dtype='datetime64[ns]', length=2400, freq='H')

In [4]:
df = pd.DataFrame({"a": np.arange(2400), "b": list("RuchiAme" * 300)}, index=index)

In [5]:
df

Unnamed: 0,a,b
2021-09-01 00:00:00,0,R
2021-09-01 01:00:00,1,u
2021-09-01 02:00:00,2,c
2021-09-01 03:00:00,3,h
2021-09-01 04:00:00,4,i
...,...,...
2021-12-09 19:00:00,2395,h
2021-12-09 20:00:00,2396,i
2021-12-09 21:00:00,2397,A
2021-12-09 22:00:00,2398,m


In [9]:
list("abc"*2)

['a', 'b', 'c', 'a', 'b', 'c']

In [13]:
df.head(10)

Unnamed: 0,a,b
2021-09-01 00:00:00,0,R
2021-09-01 01:00:00,1,u
2021-09-01 02:00:00,2,c
2021-09-01 03:00:00,3,h
2021-09-01 04:00:00,4,i
2021-09-01 05:00:00,5,A
2021-09-01 06:00:00,6,m
2021-09-01 07:00:00,7,e
2021-09-01 08:00:00,8,R
2021-09-01 09:00:00,9,u


### craete a dask dataframe out of pandas

In [23]:
#Now we have a DataFrame with 2 columns and 2400 rows composed of 10
#partitions where each partition has 240 rows. Each partition represents a piece of the data.
ddf = dd.from_pandas(df, npartitions=10)

In [15]:
ddf

Unnamed: 0_level_0,a,b
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-09-01 00:00:00,int64,object
2021-09-11 00:00:00,...,...
...,...,...
2021-11-30 00:00:00,...,...
2021-12-09 23:00:00,...,...


In [20]:
ddf.shape

(Delayed('int-698ec861-8505-46c0-9e1e-acc2d50f2df5'), 2)

In [17]:
type(ddf)

dask.dataframe.core.DataFrame

In [24]:
# check the index values covered by each partition
ddf.divisions

(Timestamp('2021-09-01 00:00:00', freq='H'),
 Timestamp('2021-09-11 00:00:00', freq='H'),
 Timestamp('2021-09-21 00:00:00', freq='H'),
 Timestamp('2021-10-01 00:00:00', freq='H'),
 Timestamp('2021-10-11 00:00:00', freq='H'),
 Timestamp('2021-10-21 00:00:00', freq='H'),
 Timestamp('2021-10-31 00:00:00', freq='H'),
 Timestamp('2021-11-10 00:00:00', freq='H'),
 Timestamp('2021-11-20 00:00:00', freq='H'),
 Timestamp('2021-11-30 00:00:00', freq='H'),
 Timestamp('2021-12-09 23:00:00', freq='H'))

In [25]:
# access a particular partition
ddf.partitions[1]

Unnamed: 0_level_0,a,b
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-09-11,int64,object
2021-09-21,...,...


In [26]:
ddf.b

Dask Series Structure:
npartitions=10
2021-09-01 00:00:00    object
2021-09-11 00:00:00       ...
                        ...  
2021-11-30 00:00:00       ...
2021-12-09 23:00:00       ...
Name: b, dtype: object
Dask Name: getitem, 20 tasks

## Computation

Dask is lazily evaluated. The result from a computation isn’t computed until you ask for it. Instead, a Dask task graph for the computation is produced.

Anytime you have a Dask object and you want to get the result, call compute:

In [27]:
ddf["2021-10-01": "2021-10-09 5:00"].compute()

Unnamed: 0,a,b
2021-10-01 00:00:00,720,R
2021-10-01 01:00:00,721,u
2021-10-01 02:00:00,722,c
2021-10-01 03:00:00,723,h
2021-10-01 04:00:00,724,i
...,...,...
2021-10-09 01:00:00,913,u
2021-10-09 02:00:00,914,c
2021-10-09 03:00:00,915,h
2021-10-09 04:00:00,916,i


In [28]:
ddf["2021-09-01": "2021-10-09 5:00"].compute()

Unnamed: 0,a,b
2021-09-01 00:00:00,0,R
2021-09-01 01:00:00,1,u
2021-09-01 02:00:00,2,c
2021-09-01 03:00:00,3,h
2021-09-01 04:00:00,4,i
...,...,...
2021-10-09 01:00:00,913,u
2021-10-09 02:00:00,914,c
2021-10-09 03:00:00,915,h
2021-10-09 04:00:00,916,i


In [29]:
ddf.a.mean()

dd.Scalar<series-..., dtype=float64>

In [30]:
ddf.a.mean().compute()

1199.5

In [31]:
ddf.b.unique()

Dask Series Structure:
npartitions=1
    object
       ...
Name: b, dtype: object
Dask Name: unique-agg, 33 tasks

In [32]:
ddf.b.unique().compute()

0    R
1    u
2    c
3    h
4    i
5    A
6    m
7    e
Name: b, dtype: object

In [33]:
result = ddf["2021-10-01": "2021-10-09 5:00"].a.cumsum() - 100

In [34]:
result

Dask Series Structure:
npartitions=1
2021-10-01 00:00:00.000000000    int64
2021-10-09 05:00:59.999999999      ...
Name: a, dtype: int64
Dask Name: sub, 16 tasks

In [35]:
result.compute()

2021-10-01 00:00:00       620
2021-10-01 01:00:00      1341
2021-10-01 02:00:00      2063
2021-10-01 03:00:00      2786
2021-10-01 04:00:00      3510
                        ...  
2021-10-09 01:00:00    158301
2021-10-09 02:00:00    159215
2021-10-09 03:00:00    160130
2021-10-09 04:00:00    161046
2021-10-09 05:00:00    161963
Freq: H, Name: a, Length: 198, dtype: int64

In [36]:
result.dask

0,1
"layer_type  MaterializedLayer  is_materialized  True  npartitions  10  columns  ['a', 'b']  type  dask.dataframe.core.DataFrame  dataframe_type  pandas.core.frame.DataFrame  series_dtypes  {'a': dtype('int64'), 'b': dtype('O')}",

0,1
layer_type,MaterializedLayer
is_materialized,True
npartitions,10
columns,"['a', 'b']"
type,dask.dataframe.core.DataFrame
dataframe_type,pandas.core.frame.DataFrame
series_dtypes,"{'a': dtype('int64'), 'b': dtype('O')}"

0,1
"layer_type  MaterializedLayer  is_materialized  True  npartitions  1  columns  ['a', 'b']  type  dask.dataframe.core.DataFrame  dataframe_type  pandas.core.frame.DataFrame  series_dtypes  {'a': dtype('int64'), 'b': dtype('O')}",

0,1
layer_type,MaterializedLayer
is_materialized,True
npartitions,1
columns,"['a', 'b']"
type,dask.dataframe.core.DataFrame
dataframe_type,pandas.core.frame.DataFrame
series_dtypes,"{'a': dtype('int64'), 'b': dtype('O')}"

0,1
layer_type  Blockwise  is_materialized  False,

0,1
layer_type,Blockwise
is_materialized,False

0,1
layer_type  Blockwise  is_materialized  False,

0,1
layer_type,Blockwise
is_materialized,False

0,1
layer_type  Blockwise  is_materialized  False,

0,1
layer_type,Blockwise
is_materialized,False

0,1
layer_type  MaterializedLayer  is_materialized  True,

0,1
layer_type,MaterializedLayer
is_materialized,True

0,1
layer_type  Blockwise  is_materialized  True,

0,1
layer_type,Blockwise
is_materialized,True


In [37]:
result.visualize()

RuntimeError: Drawing dask graphs requires the `graphviz` python library and the `graphviz` system library to be installed.

In [39]:
#!pip install graphviz

In [40]:
result.visualize()

ExecutableNotFound: failed to execute PosixPath('dot'), make sure the Graphviz executables are on your systems' PATH

In [41]:
import numpy as np

In [8]:
i = np.ones((1,2))

In [10]:
ii = np.ones((1000,100,100))

In [11]:
ii

array([[[1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1., 1.],
        ...,
        [1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1., 1.]],

       [[1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1., 1.],
        ...,
        [1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1., 1.]],

       [[1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1., 1.],
        ...,
        [1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1., 1.]],

       ...,

       [[1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1., 1.],
        ...,
        [1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1., 1.],
        [1., 1., 1., ..., 1., 1.

#### if we were to have array with more dimensions then this program would run and computer will hang 

## so instead we can use dask for that

In [12]:
import dask.array as da

In [13]:
ii = da.ones((1000,100,100))

In [14]:
ii

Unnamed: 0,Array,Chunk
Bytes,76.29 MiB,76.29 MiB
Shape,"(1000, 100, 100)","(1000, 100, 100)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 76.29 MiB 76.29 MiB Shape (1000, 100, 100) (1000, 100, 100) Count 1 Tasks 1 Chunks Type float64 numpy.ndarray",100  100  1000,

Unnamed: 0,Array,Chunk
Bytes,76.29 MiB,76.29 MiB
Shape,"(1000, 100, 100)","(1000, 100, 100)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [17]:
ii = da.ones((10000,100,100))
ii

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,95.37 MiB
Shape,"(10000, 100, 100)","(1250, 100, 100)"
Count,8 Tasks,8 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 762.94 MiB 95.37 MiB Shape (10000, 100, 100) (1250, 100, 100) Count 8 Tasks 8 Chunks Type float64 numpy.ndarray",100  100  10000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,95.37 MiB
Shape,"(10000, 100, 100)","(1250, 100, 100)"
Count,8 Tasks,8 Chunks
Type,float64,numpy.ndarray


In [19]:
#this line failed in my computer with numpy, therefore running now with dask
ii = da.ones((100000,100,100))
ii

Unnamed: 0,Array,Chunk
Bytes,7.45 GiB,95.37 MiB
Shape,"(100000, 100, 100)","(1250, 100, 100)"
Count,80 Tasks,80 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 7.45 GiB 95.37 MiB Shape (100000, 100, 100) (1250, 100, 100) Count 80 Tasks 80 Chunks Type float64 numpy.ndarray",100  100  100000,

Unnamed: 0,Array,Chunk
Bytes,7.45 GiB,95.37 MiB
Shape,"(100000, 100, 100)","(1250, 100, 100)"
Count,80 Tasks,80 Chunks
Type,float64,numpy.ndarray
