# Using Dask to parallalize computing

# Installing dask

In [81]:
!pip install dask
!pip install cloudpickle
!pip install "dask[dataframe]"
!pip install "dask[complete]"



In [25]:
!pip show dask

Name: dask
Version: 2022.2.1
Summary: Parallel PyData with Task Scheduling
Home-page: https://github.com/dask/dask/
Author: 
Author-email: 
License: BSD
Location: /Users/keerthan/opt/anaconda3/lib/python3.9/site-packages
Requires: partd, packaging, pyyaml, fsspec, toolz, cloudpickle
Required-by: intake, distributed, datashader


In [26]:
# imports
try:
    import os
    import json
    import math
    import dask
    from dask.distributed import Client
    import dask.dataframe as dd
    import numpy as np
    import dask.multiprocessing
except Exception as e:
    print("Some modules are missing: {}".format(e))

In [27]:
os.listdir()

['.ipynb_checkpoints',
 'ex8data1-Copy1.mat',
 'Dask.ipynb',
 'dask-worker-space',
 'netflix_titles.csv']

In [28]:
size = os.path.getsize("ex8data1-Copy1.mat") / math.pow(1024,3)
print("size in GB: {}".format(size))

size in GB: 8.848495781421661e-06


In [30]:
# setting up a client with a distributed system
client = Client(n_workers=3, threads_per_worker=1, processes=False, memory_limit="2GB")

Perhaps you already have a cluster running?
Hosting the HTTP server on port 55221 instead


In [31]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://192.168.1.138:55221/status,

0,1
Dashboard: http://192.168.1.138:55221/status,Workers: 3
Total threads: 3,Total memory: 5.59 GiB
Status: running,Using processes: False

0,1
Comm: inproc://192.168.1.138/56480/26,Workers: 3
Dashboard: http://192.168.1.138:55221/status,Total threads: 3
Started: Just now,Total memory: 5.59 GiB

0,1
Comm: inproc://192.168.1.138/56480/29,Total threads: 1
Dashboard: http://192.168.1.138:55222/status,Memory: 1.86 GiB
Nanny: None,
Local directory: /Users/keerthan/PycharmProjects/Tutorials/dask-worker-space/worker-pn_hm9hl,Local directory: /Users/keerthan/PycharmProjects/Tutorials/dask-worker-space/worker-pn_hm9hl

0,1
Comm: inproc://192.168.1.138/56480/31,Total threads: 1
Dashboard: http://192.168.1.138:55224/status,Memory: 1.86 GiB
Nanny: None,
Local directory: /Users/keerthan/PycharmProjects/Tutorials/dask-worker-space/worker-v7nctuo3,Local directory: /Users/keerthan/PycharmProjects/Tutorials/dask-worker-space/worker-v7nctuo3

0,1
Comm: inproc://192.168.1.138/56480/30,Total threads: 1
Dashboard: http://192.168.1.138:55223/status,Memory: 1.86 GiB
Nanny: None,
Local directory: /Users/keerthan/PycharmProjects/Tutorials/dask-worker-space/worker-jiwi3d10,Local directory: /Users/keerthan/PycharmProjects/Tutorials/dask-worker-space/worker-jiwi3d10


In [32]:
#####Read the file

In [33]:
#Downloaded a csv file 
os.listdir()

['.ipynb_checkpoints',
 'ex8data1-Copy1.mat',
 'Dask.ipynb',
 'dask-worker-space',
 'netflix_titles.csv']

In [34]:
data = dd.read_csv("netflix_titles.csv")

In [50]:
data = data.compute(scheduler='threads')

In [51]:
data.head(1)

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,s1,Movie,Dick Johnson Is Dead,Kirsten Johnson,,United States,"September 25, 2021",2020,PG-13,90 min,Documentaries,"As her father nears the end of his life, filmm..."


In [52]:
data.columns

Index(['show_id', 'type', 'title', 'director', 'cast', 'country', 'date_added',
       'release_year', 'rating', 'duration', 'listed_in', 'description'],
      dtype='object')

In [53]:
data["show_id"]

0          s1
1          s2
2          s3
3          s4
4          s5
        ...  
8802    s8803
8803    s8804
8804    s8805
8805    s8806
8806    s8807
Name: show_id, Length: 8807, dtype: object

In [54]:
data.show_id.head(1)

0    s1
Name: show_id, dtype: object

In [46]:
### Apply functions

In [55]:
def toupper(x):
    return x.upper()

In [56]:
data.title.head(2)

0    Dick Johnson Is Dead
1           Blood & Water
Name: title, dtype: object

In [58]:
data.title = data["title"].map(toupper)

In [59]:
data.title.head(2)

0    DICK JOHNSON IS DEAD
1           BLOOD & WATER
Name: title, dtype: object

In [60]:
### Apply across cluster

In [61]:
A = client.map(toupper, data['title'])

In [62]:
A

[<Future: finished, type: str, key: toupper-c290769e923e5020230c3fddf19d6fe4>,
 <Future: finished, type: str, key: toupper-0a82b9d22e5a4a938086e2555ada59c9>,
 <Future: finished, type: str, key: toupper-4784e7e7a22673ec77cababc52ffc5c4>,
 <Future: finished, type: str, key: toupper-d5a1909bfa86b7b7a48b1b50c063f3f3>,
 <Future: finished, type: str, key: toupper-68ae0aac9f8c040b96c6e0f40d7b9ce4>,
 <Future: finished, type: str, key: toupper-6f93f86db9c1db8d9483bdc28b62e6c0>,
 <Future: finished, type: str, key: toupper-a2d0a1633f44765067eb23499ffa97cc>,
 <Future: finished, type: str, key: toupper-23c0fa5d85944f3ba871acc3d7ca36e8>,
 <Future: finished, type: str, key: toupper-7e0152b28a286772d9797f9bb6d50762>,
 <Future: finished, type: str, key: toupper-0de8e90c7c6827758b2129e435dd8155>,
 <Future: finished, type: str, key: toupper-8522b8aa85b8c4b6b1a67c9a0eec90e9>,
 <Future: finished, type: str, key: toupper-545f35af8166dab4fa206a01ed9fb48c>,
 <Future: finished, type: str, key: toupper-4c7e2009

In [63]:
A[0]

In [64]:
A[0].result()

'DICK JOHNSON IS DEAD'

In [65]:
tem = [result.result() for result in client.map(toupper, data['title'])]

In [66]:
tem[0]

'DICK JOHNSON IS DEAD'

In [68]:
### Convert tem to a np array

In [69]:
import dask.array as da

In [70]:
permutations = da.from_array(np.array(tem))

In [71]:
permutations

Unnamed: 0,Array,Chunk
Bytes,3.49 MiB,3.49 MiB
Shape,"(8807,)","(8807,)"
Count,1 Tasks,1 Chunks
Type,numpy.ndarray,
"Array Chunk Bytes 3.49 MiB 3.49 MiB Shape (8807,) (8807,) Count 1 Tasks 1 Chunks Type numpy.ndarray",8807  1,

Unnamed: 0,Array,Chunk
Bytes,3.49 MiB,3.49 MiB
Shape,"(8807,)","(8807,)"
Count,1 Tasks,1 Chunks
Type,numpy.ndarray,


In [72]:
data['random'] = permutations

In [73]:
data.head(1)

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description,random
0,s1,Movie,DICK JOHNSON IS DEAD,Kirsten Johnson,,United States,"September 25, 2021",2020,PG-13,90 min,Documentaries,"As her father nears the end of his life, filmm...",DICK JOHNSON IS DEAD


In [74]:
### As an example, parallalize a for loop

In [75]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x,y):
    sleep(1)
    
data = [1,2,3,4,5,6,7,8]

In [76]:
%%time

results = []
for x in data:
    y = inc(x)
    results.append(y)
    
    
total = sum(results)

CPU times: user 2.02 s, sys: 538 ms, total: 2.55 s
Wall time: 8.04 s


In [77]:
### Now, use the 'delayed' function and call compute()

In [79]:
from dask import delayed

In [80]:
%%time

results = []
for x in data:
    y = delayed(inc)(x)
    results.append(y)
    
    
total = delayed(sum)(results)
print('before computing: {}'.format(total))  #Note: have to call compute() for distributed tasks
result = total.compute()
print('after computing: {}'.format(total))   #Calling the compute function

before computing: Delayed('sum-44ee3de8-2b65-429c-a641-990365875650')
after computing: Delayed('sum-44ee3de8-2b65-429c-a641-990365875650')
CPU times: user 1.16 s, sys: 309 ms, total: 1.47 s
Wall time: 4.11 s
