In [1]:
from dask.distributed import LocalCluster, Client

In [3]:
#cluster = LocalCluster(n_workers = 5, memory_limit = 0.1, processes = False)

## Without clusters: Dask Dataframes

In [4]:
import os              
import urllib 

import dask.dataframe as dd

In [5]:
url = 'https://arcticdata.io/metacat/d1/mn/v2/object/urn%3Auuid%3A27e4043d-75eb-4c4f-9427-0d442526c154'

msg = urllib.request.urlretrieve(url, "dg_soil_moisture.csv")

In [10]:
fp = os.path.join(os.getcwd(), 'dg_soil_moisture.csv')
df = dd.read_csv(fp, blocksize = '20MB', encoding = 'ISO-8859-1') # see notebook to determine the encoding parameter
# in order to load the csv data as a dask dataframe, we need to specify the partition size
# how big in memory we want the partition to be
df # dask now knows where the data is, and how to chunk it, but it hasn't read in the file
# this is to prevent loading in a file that is too big for memory
# you can think of this as a future object of sorts

Unnamed: 0_level_0,timestamp,year,doy,hour,minute,site,logger,port,sensor,sensorZ,m_soil,unit
npartitions=6,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
,object,int64,int64,int64,int64,object,object,object,object,int64,float64,object
,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...


In [11]:
averages = df.groupby('year').mean()
averages
# nothing happened, this is "lazy computations", then you have to tell it to compute to actually make it compute 

Unnamed: 0_level_0,doy,hour,minute,sensorZ,m_soil
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,float64,float64,float64,float64,float64
,...,...,...,...,...


In [12]:
averages.compute() # the dataframe is now read in, partitioned as you specified, and then the avg is taken
# the objects are never loaded into memory because why would dask do that if the data is too big for memory
# our workers are threads

Unnamed: 0_level_0,doy,hour,minute,sensorZ,m_soil
year,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2014,276.852636,11.513992,15.001123,-11.998332,0.273744
2015,186.720383,11.500723,15.0,-13.047899,0.263738
2016,183.497453,11.499777,14.999858,-15.000009,0.293595
2017,181.414843,11.499381,15.000144,-14.999981,0.266121
2018,201.824077,11.500796,15.000356,-15.435365,0.282395
2019,173.693311,11.498577,15.0,-15.124516,0.222193
2020,138.806679,11.489825,14.999322,-15.200054,0.252467


## Dask Arrays

In [13]:
import numpy as np
import dask.array as da

In [15]:
# first part forms a linear set, then you reshape it 
data = np.arange(100_000).reshape(200, 500) # just an example for how to create a dask array from scratch, then we will make it a dask array
a = da.from_array(data, chunks = (100, 100)) # each of the smaller np arrays will be 100 by 100
a # the way that the data is partitioned affects how the data is processed

Unnamed: 0,Array,Chunk
Bytes,781.25 kiB,78.12 kiB
Shape,"(200, 500)","(100, 100)"
Count,1 Graph Layer,10 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 781.25 kiB 78.12 kiB Shape (200, 500) (100, 100) Count 1 Graph Layer 10 Chunks Type int64 numpy.ndarray",500  200,

Unnamed: 0,Array,Chunk
Bytes,781.25 kiB,78.12 kiB
Shape,"(200, 500)","(100, 100)"
Count,1 Graph Layer,10 Chunks
Type,int64,numpy.ndarray


In [17]:
a.mean().compute() # the whole point of dask is to accumulate enough instructions to compute at the end, you dont wanna compute often

49999.5

## With Clusters

In [None]:
#client = Client(address = "tcp://128.111.85.28:8786")