# Dask arrays

Dask provides arrays, that are internally broken into **chunks** and processed in parallel
- Numpy-like arrays
- Pandas-like dataframes

# 1) Dask (Numpy-like) arrays

## 1.1) Understanding Numpy's limitations

In [1]:
# Having a large array that fits entirely in RAM makes it inefficient for computation
import numpy as np

try:
    large_array = np.ones((100000, 100000), dtype=np.float64)
except Exception as e:
    print("Memory error!", e)

Memory error! Unable to allocate 74.5 GiB for an array with shape (100000, 100000) and data type float64


In [2]:
# Let's do it with Dask, which creates arrays using CHUNKING
import dask.array as da

# Here we specify that Dask must break the array in chunks of 1000 x 1000
size = 100000
dask_array = da.ones((size, size), chunks=(1000,1000) ,dtype=np.float64)
print(dask_array)

dask.array<ones_like, shape=(100000, 100000), dtype=float64, chunksize=(1000, 1000), chunktype=numpy.ndarray>


In [3]:
print(dask_array)

dask.array<ones_like, shape=(100000, 100000), dtype=float64, chunksize=(1000, 1000), chunktype=numpy.ndarray>


## 1.2) Converting a Numpy array to Dask array

In [4]:
np_array = np.ones((10000, 10000), dtype=np.float64)
dask_from_numpy = da.from_array(np_array, chunks=(1000,1000))
print(dask_from_numpy)

dask.array<array, shape=(10000, 10000), dtype=float64, chunksize=(1000, 1000), chunktype=numpy.ndarray>


## 1.3) Converting a Dask array back to Numpy

You have to call the **compute()** method

In [5]:
np_result = dask_from_numpy.compute()
print(np_result)

[[1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]
 ...
 [1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]]


# 2) Dask (Pandas-like) Data Frames

The same issues of memory limitations are overcome by Dask. It also replaces single-thread computation by parallel computing

## 2.1) Pandas dataframe limitations

In [6]:
#import pandas as pd
import dask.dataframe as dd # you have to install pyarrow

In [7]:
# Loading a big file with Pandas produces an error. 
# Doing it with Dask

df = dd.read_csv("../data/systems_20250729.csv", dtype={'number_records': 'float64'})
df.head()

Unnamed: 0,system_id,system_public_name,site_location,timezone_or_utc_offset,latitude,longitude,elevation_m,dc_capacity_kW,kg_climate,pvcz_composite,...,number_records,dataset_size_mb,available_sensor_channels,qa_status,qa_issue,Unnamed: 26,Unnamed: 27,Unnamed: 28,Unnamed: 29,Unnamed: 30
0,2,Residential 1a,"Lakewood, CO",America/Denver,39.7214,-105.0972,1675.0,2.912,Dfb,12,...,13685898.0,313.25,7,fail,less than 1.0 years data,,,,,
1,3,Residential 1b,"Lakewood, CO",America/Denver,39.7214,-105.0972,1675.0,2.72,Dfb,12,...,12668178.0,289.95,7,fail,,,,,,
2,4,NREL x-Si -1,"Golden, CO",7,39.7406,-105.1774,1795.3,1.0,BSk,12,...,113978017.0,2608.75,15,pass,"Filtered time series less than 1.0 years data,...",,,,,
3,10,NREL CIS -1,"Golden, CO",7,39.7404,-105.1774,1792.8,1.12,BSk,12,...,113103574.0,2588.74,14,pass,Filtered time series less than 1.0 years data,,,,,
4,33,Silicor Materials,"Golden, CO",7,39.7404,-105.1772,1794.0,2.4,BSk,12,...,113673602.0,2601.78,15,pass,"Percent clipping exceeded threshold of 10%, Fi...",,,,,


In [8]:
# The interface is that same as pandas
df.dtypes


system_id                              int64
system_public_name           string[pyarrow]
site_location                string[pyarrow]
timezone_or_utc_offset       string[pyarrow]
latitude                             float64
longitude                            float64
elevation_m                          float64
dc_capacity_kW                       float64
kg_climate                   string[pyarrow]
pvcz_composite                         int64
pvcz_t_rack                            int64
pvcz_t_roof                            int64
pvcz_humidity                          int64
pvcz_wind                              int64
tracking                     string[pyarrow]
type                         string[pyarrow]
azimuth                              float64
tilt                                 float64
first_timestamp              string[pyarrow]
last_timestamp               string[pyarrow]
years                                float64
number_records                       float64
dataset_si

In [9]:
df.columns

Index(['system_id', 'system_public_name', 'site_location',
       'timezone_or_utc_offset', 'latitude', 'longitude', 'elevation_m',
       'dc_capacity_kW', 'kg_climate', 'pvcz_composite', 'pvcz_t_rack',
       'pvcz_t_roof', 'pvcz_humidity', 'pvcz_wind', 'tracking', 'type',
       'azimuth', 'tilt', 'first_timestamp', 'last_timestamp', 'years',
       'number_records', 'dataset_size_mb', 'available_sensor_channels',
       'qa_status', 'qa_issue', 'Unnamed: 26', 'Unnamed: 27', 'Unnamed: 28',
       'Unnamed: 29', 'Unnamed: 30'],
      dtype='object')

## 2.2) Performing computations on arrays

Here again, you have to call **compute()**  (lazy evaluation)

In [10]:
# NOTE: isnull() This function indicates whether values are missing (NaN in numeric arrays, None etc)
# We can check the number of missing values per columns
df.isnull().sum().compute()

system_id                       0
system_public_name              0
site_location                 240
timezone_or_utc_offset          0
latitude                        0
longitude                       0
elevation_m                     0
dc_capacity_kW                 65
kg_climate                    240
pvcz_composite                  0
pvcz_t_rack                     0
pvcz_t_roof                     0
pvcz_humidity                   0
pvcz_wind                       0
tracking                      241
type                          241
azimuth                       241
tilt                          290
first_timestamp                 2
last_timestamp                  2
years                           2
number_records                  2
dataset_size_mb                 2
available_sensor_channels       0
qa_status                       0
qa_issue                     1403
Unnamed: 26                  1862
Unnamed: 27                  1862
Unnamed: 28                  1862
Unnamed: 29   

# 3) Dask bags: managing Python objects that do not fit in tabular format

Ideal for undtructured/semi-structured data like json, text files and logs

## 3.1) Loading data

In [11]:
# ex: large json files or Python objects
# For this, starting a Dask Client is optional but recommended

from dask.distributed import Client

client = Client(n_workers=4, threads_per_worker=1)



In [12]:
import json, os
import dask
# the mimesis module has to be installed with pip

datadir = "../data"

# Dask provides in-built datasets fro testing purposes
data = dask.datasets.make_people()  # records from random people

data

dask.bag<mimesis, npartitions=10>

In [13]:
# Lets have a look
data.take(2)

({'age': 69,
  'name': ('Murray', 'Barber'),
  'occupation': 'Jewellery',
  'telephone': '+1-620-622-8516',
  'address': {'address': '1117 Bitting Rapids', 'city': 'Ashwaubenon'},
  'credit-card': {'number': '3460 341934 81399', 'expiration-date': '04/17'}},
 {'age': 7,
  'name': ('Joe', 'Stevens'),
  'occupation': 'Minicab Driver',
  'telephone': '+1-623-953-1143',
  'address': {'address': '420 Lisbon Court', 'city': 'Pinole'},
  'credit-card': {'number': '5243 3396 4233 7529',
   'expiration-date': '09/20'}})

In [14]:
# This saves the generated records as json files in the data directory
data.map(json.dumps).to_textfiles(os.path.join(datadir,'json/*.json'))

['/home/nicolas/CODE/daskscratch/notebooks/../data/json/0.json',
 '/home/nicolas/CODE/daskscratch/notebooks/../data/json/1.json',
 '/home/nicolas/CODE/daskscratch/notebooks/../data/json/2.json',
 '/home/nicolas/CODE/daskscratch/notebooks/../data/json/3.json',
 '/home/nicolas/CODE/daskscratch/notebooks/../data/json/4.json',
 '/home/nicolas/CODE/daskscratch/notebooks/../data/json/5.json',
 '/home/nicolas/CODE/daskscratch/notebooks/../data/json/6.json',
 '/home/nicolas/CODE/daskscratch/notebooks/../data/json/7.json',
 '/home/nicolas/CODE/daskscratch/notebooks/../data/json/8.json',
 '/home/nicolas/CODE/daskscratch/notebooks/../data/json/9.json']

## 3.2) generating bags

In [15]:
import dask.bag as db

In [16]:
# Read the json files previously generated
b = db.read_text(os.path.join(datadir, 'json/*.json'))
b

dask.bag<bag-from-delayed, npartitions=10>

In [17]:
# Convert them to Python dicts
d = b.map(json.loads)
d

dask.bag<loads, npartitions=10>

In [18]:
# Have a look, what we have is a list of dicts
d.take(3)

({'age': 69,
  'name': ['Murray', 'Barber'],
  'occupation': 'Jewellery',
  'telephone': '+1-620-622-8516',
  'address': {'address': '1117 Bitting Rapids', 'city': 'Ashwaubenon'},
  'credit-card': {'number': '3460 341934 81399', 'expiration-date': '04/17'}},
 {'age': 7,
  'name': ['Joe', 'Stevens'],
  'occupation': 'Minicab Driver',
  'telephone': '+1-623-953-1143',
  'address': {'address': '420 Lisbon Court', 'city': 'Pinole'},
  'credit-card': {'number': '5243 3396 4233 7529',
   'expiration-date': '09/20'}},
 {'age': 104,
  'name': ['Conchita', 'Walter'],
  'occupation': 'Purchase Ledger Clerk',
  'telephone': '+17640929468',
  'address': {'address': '457 Kinzey Walk', 'city': 'Casper'},
  'credit-card': {'number': '4939 6561 1326 9994',
   'expiration-date': '08/19'}})

## 3.3) Mapping and filtering data in bags  

In [19]:
# Dask bags allow us to perform operations such as filtering, mapping and aggregation

# Ex: filter records for age > 30 years  usong FILTER
filtered = d.filter(lambda record: record['age']>30).take(3)
filtered

({'age': 69,
  'name': ['Murray', 'Barber'],
  'occupation': 'Jewellery',
  'telephone': '+1-620-622-8516',
  'address': {'address': '1117 Bitting Rapids', 'city': 'Ashwaubenon'},
  'credit-card': {'number': '3460 341934 81399', 'expiration-date': '04/17'}},
 {'age': 104,
  'name': ['Conchita', 'Walter'],
  'occupation': 'Purchase Ledger Clerk',
  'telephone': '+17640929468',
  'address': {'address': '457 Kinzey Walk', 'city': 'Casper'},
  'credit-card': {'number': '4939 6561 1326 9994',
   'expiration-date': '08/19'}},
 {'age': 53,
  'name': ['Jolyn', 'Rogers'],
  'occupation': 'Ambulance Controller',
  'telephone': '+17728816113',
  'address': {'address': '397 Harry Canyon', 'city': 'Hagerstown'},
  'credit-card': {'number': '3478 017032 46101', 'expiration-date': '07/16'}})

In [20]:
# Extract specific records using MAP
d.map(lambda record: record['occupation']).take(5)

('Jewellery',
 'Minicab Driver',
 'Purchase Ledger Clerk',
 'Ambulance Controller',
 'Ambulance Driver')

## 3.4) Computations with bags

In [21]:
# Computation (lazy evaluation)

d.count().compute()

10000

In [22]:
# Chaining operations
# Combining the two previous operations

d.filter(lambda record: record['age']>30).map(lambda record: record['occupation']).take(5)

('Jewellery',
 'Purchase Ledger Clerk',
 'Ambulance Controller',
 'Palaeontologist',
 'Hospital Technician')

In [23]:
# The compute() operation generates a list
result = d.filter(lambda record: record['age']>30).map(lambda record: record['occupation']).frequencies(sort=True).topk(10,key=1).compute()
result

[('Lift Attendant', 18),
 ('Analytical Chemist', 17),
 ('Labelling Operator', 17),
 ('Land Surveyor', 16),
 ('Maintenance Fitter', 16),
 ('Payroll Clerk', 15),
 ('Bailiff', 15),
 ('Watchmaker', 15),
 ('Glass Worker', 15),
 ('Repairer', 14)]

## 3.5) Storing processed bag data

In [42]:
# If you don't have to call compute(),it remains a bag object
result_bag = d.filter(lambda record: record['age']>50)

# Now you can json dump it (it is still a bag
result_json = result_bag.map(json.dumps)
result_json

dask.bag<dumps, npartitions=10>

In [43]:
# Write it to json text files
result_json = result_json.to_textfiles(os.path.join(datadir, "json/processed/*.json"))

## 3.6) Converting Dask bags to Dask dataframes

It is necessary to **flatten** the data first (i.e, remove nested structures)

In [44]:
def flatten(record):
    return{
        'age': record['age'],
        'occupation': record['occupation'],
        'telephone': record['telephone'],
        'credit-card-number': record['credit-card']['number'],
        'credit-card-expiration': record['credit-card']['expiration-date'],
        'name': ' '.join(record['name']),
        'street-address': record['address']['address'],
        'city': record['address']['city'], 
    
    }
    

In [46]:
result_bag

dask.bag<filter-lambda, npartitions=10>

In [45]:
# Try it on our bag object, display the 5 first records
result_dico = result_bag.map(flatten).take(5)
result_dico # Which is a tuple of dicts

({'age': 69,
  'occupation': 'Jewellery',
  'telephone': '+1-620-622-8516',
  'credit-card-number': '3460 341934 81399',
  'credit-card-expiration': '04/17',
  'name': 'Murray Barber',
  'street-address': '1117 Bitting Rapids',
  'city': 'Ashwaubenon'},
 {'age': 104,
  'occupation': 'Purchase Ledger Clerk',
  'telephone': '+17640929468',
  'credit-card-number': '4939 6561 1326 9994',
  'credit-card-expiration': '08/19',
  'name': 'Conchita Walter',
  'street-address': '457 Kinzey Walk',
  'city': 'Casper'},
 {'age': 53,
  'occupation': 'Ambulance Controller',
  'telephone': '+17728816113',
  'credit-card-number': '3478 017032 46101',
  'credit-card-expiration': '07/16',
  'name': 'Jolyn Rogers',
  'street-address': '397 Harry Canyon',
  'city': 'Hagerstown'},
 {'age': 85,
  'occupation': 'Palaeontologist',
  'telephone': '+13207796874',
  'credit-card-number': '3751 367748 29518',
  'credit-card-expiration': '12/22',
  'name': 'Layla Albert',
  'street-address': '400 Hawthorne Shore',


In [47]:
# Ok so let's convert this flattened bag object into a dask dataframe
df2 = result_bag.map(flatten).to_dataframe()
df2.head()

2025-11-06 22:00:00,769 - distributed.worker - ERROR - Compute Failed
Key:       ('bag-from-delayed-file_to_blocks-filter-lambda-flatten-list-loads-to_dataframe-1296ef8ce75a2966443e6992e0f79f39', 0)
State:     executing
Task:  <Task ('bag-from-delayed-file_to_blocks-filter-lambda-flatten-list-loads-to_dataframe-1296ef8ce75a2966443e6992e0f79f39', 0) _execute_subgraph(...)>
Exception: 'AttributeError("partially initialized module \'pandas\' has no attribute \'core\' (most likely due to a circular import)")'
Traceback: '  File "/home/nicolas/.cache/pypoetry/virtualenvs/daskscratch-x6OfnjBx-py3.12/lib/python3.12/site-packages/dask/bag/core.py", line 2612, in to_dataframe\n    import pandas as pd\n  File "/home/nicolas/.cache/pypoetry/virtualenvs/daskscratch-x6OfnjBx-py3.12/lib/python3.12/site-packages/pandas/__init__.py", line 139, in <module>\n    from pandas import testing\n  File "/home/nicolas/.cache/pypoetry/virtualenvs/daskscratch-x6OfnjBx-py3.12/lib/python3.12/site-packages/pandas/t

AttributeError: partially initialized module 'pandas' has no attribute 'core' (most likely due to a circular import)

In [29]:
type(df2)

dask.dataframe.dask_expr._collection.DataFrame

In [38]:
df2.compute()

2025-11-06 21:59:17,234 - distributed.worker - ERROR - Compute Failed
Key:       ('bag-from-delayed-file_to_blocks-filter-lambda-flatten-list-loads-to_dataframe-1296ef8ce75a2966443e6992e0f79f39', 9)
State:     executing
Task:  <Task ('bag-from-delayed-file_to_blocks-filter-lambda-flatten-list-loads-to_dataframe-1296ef8ce75a2966443e6992e0f79f39', 9) _execute_subgraph(...)>
Exception: 'AttributeError("partially initialized module \'pandas\' has no attribute \'core\' (most likely due to a circular import)")'
Traceback: '  File "/home/nicolas/.cache/pypoetry/virtualenvs/daskscratch-x6OfnjBx-py3.12/lib/python3.12/site-packages/dask/bag/core.py", line 2612, in to_dataframe\n    import pandas as pd\n  File "/home/nicolas/.cache/pypoetry/virtualenvs/daskscratch-x6OfnjBx-py3.12/lib/python3.12/site-packages/pandas/__init__.py", line 139, in <module>\n    from pandas import testing\n  File "/home/nicolas/.cache/pypoetry/virtualenvs/daskscratch-x6OfnjBx-py3.12/lib/python3.12/site-packages/pandas/t

AttributeError: partially initialized module 'pandas' has no attribute 'core' (most likely due to a circular import)

In [35]:
#Now performing some computation of the dataframe
df2[df2.age>50].isna().compute()

2025-11-06 21:55:50,622 - distributed.worker - ERROR - Compute Failed
Key:       ('bag-from-delayed-file_to_blocks-filter-lambda-flatten-list-loads-to_dataframe-1296ef8ce75a2966443e6992e0f79f39', 2)
State:     executing
Task:  <Task ('bag-from-delayed-file_to_blocks-filter-lambda-flatten-list-loads-to_dataframe-1296ef8ce75a2966443e6992e0f79f39', 2) _execute_subgraph(...)>
Exception: 'AttributeError("partially initialized module \'pandas\' has no attribute \'core\' (most likely due to a circular import)")'
Traceback: '  File "/home/nicolas/.cache/pypoetry/virtualenvs/daskscratch-x6OfnjBx-py3.12/lib/python3.12/site-packages/dask/bag/core.py", line 2612, in to_dataframe\n    import pandas as pd\n  File "/home/nicolas/.cache/pypoetry/virtualenvs/daskscratch-x6OfnjBx-py3.12/lib/python3.12/site-packages/pandas/__init__.py", line 139, in <module>\n    from pandas import testing\n  File "/home/nicolas/.cache/pypoetry/virtualenvs/daskscratch-x6OfnjBx-py3.12/lib/python3.12/site-packages/pandas/t

AttributeError: partially initialized module 'pandas' has no attribute 'core' (most likely due to a circular import)