In [2]:
# pip install dask
# pip install "dask[diagnostics]" --upgrade

In [9]:
from platform import python_version

print(python_version())

3.11.10


# Dask practice
* [Dask Vs Pandas](#third-bullet)
* [Dataframes](#second-bullet)

In [3]:
import dask.dataframe as dd

In [5]:
df = dd.read_csv("weather_data.csv")
df

Unnamed: 0_level_0,day,temperature,windspeed,event
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,object,int64,int64,object
,...,...,...,...


In [7]:
df.head()

Unnamed: 0,day,temperature,windspeed,event
0,2024-01-01,32,6,Rain
1,2024-01-02,-35,7,Sunny
2,2024-01-03,28,2,Snow
3,2024-01-04,24,7,Snow
4,2024-01-05,32,4,Rain


## Dask Vs Pandas <a class="anchor" id="third-bullet"></a>

In [23]:
import pandas as pd
import dask.dataframe as dd
import time

In [25]:
# pandas

start_time = time.time()
pandas_df = pd.read_excel('oilgas.xlsx')

pandas_filtered_df = pandas_df[pandas_df['NormalizedOilEUR'] > 50]

pandas_grouped_df = pandas_df.groupby('FormationAlias').agg({'NormalizedOilEUR': 'mean'})

pandas_time = time.time() - start_time
print(f"Pandas operations took {pandas_time:.2f} seconds")

Pandas operations took 12.15 seconds


In [26]:
# Dask 

pandas_df = pd.read_excel('oilgas.xlsx')
start_time = time.time()
dask_df = dd.from_pandas(pandas_df, npartitions=4)

dask_filtered_df = dask_df[dask_df['NormalizedOilEUR'] > 50]

dask_grouped_df = dask_df.groupby('FormationAlias').agg({'NormalizedOilEUR': 'mean'})

dask_result = dask_grouped_df.compute()

dask_time = time.time() - start_time
print(f"Dask operations took {dask_time:.2f} seconds")

# Memory Comparison
print(f"Pandas memory usage: {pandas_df.memory_usage(deep=True).sum() / (1024**2):.2f} MB")

Dask operations took 0.01 seconds
Pandas memory usage: 2.60 MB


In [27]:
dask_df

Unnamed: 0_level_0,WellID,BVHH,FormationAlias,NioGOR,CodGOR,LateralLength,ProppantPerFoot,FluidPerFoot,LeftDistance,LeftNeighbourType,RightDistance,RightNeighbourType,TVD,NormalizedOilEUR,NormalizedGasEUR
npartitions=4,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
0,int64,float64,object,float64,float64,float64,float64,float64,float64,object,float64,object,float64,float64,float64
2302,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4604,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6905,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9205,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


## Dask DataFrames <a class="anchor" id="second-bullet"></a>

In [31]:
import dask.dataframe as dd
import pandas as pd

In [33]:
df = pd.read_excel('oilgas.xlsx')
ddf = dd.from_pandas(df, npartitions=4)
ddf._meta # Many DataFrame operations rely on knowing the name and dtype of columns. 

Unnamed: 0,WellID,BVHH,FormationAlias,NioGOR,CodGOR,LateralLength,ProppantPerFoot,FluidPerFoot,LeftDistance,LeftNeighbourType,RightDistance,RightNeighbourType,TVD,NormalizedOilEUR,NormalizedGasEUR


In [37]:
ddf._meta.dtypes

WellID                  int64
BVHH                  float64
FormationAlias         object
NioGOR                float64
CodGOR                float64
LateralLength         float64
ProppantPerFoot       float64
FluidPerFoot          float64
LeftDistance          float64
LeftNeighbourType      object
RightDistance         float64
RightNeighbourType     object
TVD                   float64
NormalizedOilEUR      float64
NormalizedGasEUR      float64
dtype: object

In [39]:
ddf._meta_nonempty

Unnamed: 0,WellID,BVHH,FormationAlias,NioGOR,CodGOR,LateralLength,ProppantPerFoot,FluidPerFoot,LeftDistance,LeftNeighbourType,RightDistance,RightNeighbourType,TVD,NormalizedOilEUR,NormalizedGasEUR
0,1,1.0,foo,1.0,1.0,1.0,1.0,1.0,1.0,foo,1.0,foo,1.0,1.0,1.0
1,1,1.0,foo,1.0,1.0,1.0,1.0,1.0,1.0,foo,1.0,foo,1.0,1.0,1.0


In [51]:
ddf.describe().compute()

Unnamed: 0,WellID,BVHH,NioGOR,CodGOR,LateralLength,ProppantPerFoot,FluidPerFoot,LeftDistance,RightDistance,TVD,NormalizedOilEUR,NormalizedGasEUR
count,9206.0,7565.0,8709.0,7538.0,9206.0,8974.0,8942.0,6758.0,6778.0,8893.0,9206.0,9206.0
mean,718355400.0,1.031013,4699.70495,4219.441933,6970.06182,1013.557032,1035.336411,864.819473,877.969608,7013.938041,19.033831,105.802192
std,929525700.0,0.370794,12584.166286,3306.948828,2673.528748,529.75963,629.271011,925.304147,947.646937,616.775613,8.18722,66.225371
min,500109700.0,-0.125405,85.752059,336.47596,1749.0,0.0,0.001171,14.0,14.0,2610.0,0.0,0.0
25%,512337100.0,0.822785,2603.961103,2581.020606,4310.0,782.479378,768.361919,360.0,363.0,6924.5,14.387473,72.930225
50%,512340600.0,1.114846,3912.059746,3745.217424,7210.0,985.414101,969.383008,682.0,687.0,7065.0,18.433993,120.438532
75%,512344800.0,1.366772,5024.250073,5364.540326,10101.0,1503.474881,1677.03592,1006.75,1073.5,7684.5,24.034652,164.555622
max,4902129000.0,2.260032,357894.183679,26193.59272,16676.0,10217.314931,20117.640692,5275.0,5276.0,9793.0,96.959244,465.849396


In [13]:
ddf.npartitions

4

In [14]:
ddf.divisions

(0, 2302, 4604, 6905, 9205)

In [15]:
ddf.loc[2202:4004]  # inspects first two partitions

Unnamed: 0_level_0,WellID,BVHH,FormationAlias,NioGOR,CodGOR,LateralLength,ProppantPerFoot,FluidPerFoot,LeftDistance,LeftNeighbourType,RightDistance,RightNeighbourType,TVD,NormalizedOilEUR,NormalizedGasEUR
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
2202,int64,float64,object,float64,float64,float64,float64,float64,float64,object,float64,object,float64,float64,float64
2302,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4004,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [16]:
ddf.divisions # often we do not have such information about our partition when reading CSV files use df.set_index(...)

(0, 2302, 4604, 6905, 9205)

In [17]:
ddf.columns

Index(['WellID', 'BVHH', 'FormationAlias', 'NioGOR', 'CodGOR', 'LateralLength',
       'ProppantPerFoot', 'FluidPerFoot', 'LeftDistance', 'LeftNeighbourType',
       'RightDistance', 'RightNeighbourType', 'TVD', 'NormalizedOilEUR',
       'NormalizedGasEUR'],
      dtype='object')

In [18]:
result = ddf.groupby('FormationAlias').WellID.nunique()
computed_result = result.compute()
computed_result

FormationAlias
NIOBRARA    6917
CODELL      2289
Name: WellID, dtype: int64

You can set an index column using the .set_index(column_name) method. This operation is expensive though

In [19]:
mean_dask = ddf['BVHH'].mean()
mean_dask.compute()

np.float64(1.031013138081877)

In [20]:
# Dask can utilize multiple cores, enhancing performance for large data
ddf['NormalizedOilEUR power'] = ddf['NormalizedOilEUR'].map_partitions(lambda df: df * 2) #apply in pandas
ddf.compute()

Unnamed: 0,WellID,BVHH,FormationAlias,NioGOR,CodGOR,LateralLength,ProppantPerFoot,FluidPerFoot,LeftDistance,LeftNeighbourType,RightDistance,RightNeighbourType,TVD,NormalizedOilEUR,NormalizedGasEUR,NormalizedOilEUR power
0,500109742,1.105028,NIOBRARA,1687.414535,,8084.0,1256.308758,1234.370856,,NoNeighbour,,NoNeighbour,7501.0,15.234785,37.674048,30.469570
1,500109753,0.644480,NIOBRARA,2267.667384,,3912.0,742.842536,784.000000,,NoNeighbour,1330.0,Codeveloped,7551.0,5.327198,11.627301,10.654397
2,500109754,0.620268,NIOBRARA,2368.236087,,4137.0,912.738700,1127.047136,1330.0,Codeveloped,,NoNeighbour,7594.0,5.629925,14.544356,11.259850
3,500109760,0.649670,NIOBRARA,333.876618,,4161.0,914.683970,691.026676,,NoNeighbour,,NoNeighbour,7391.0,8.240087,7.012257,16.480173
4,500109772,1.423475,NIOBRARA,3608.065949,3199.962930,4418.0,747.271842,928.239928,,NoNeighbour,,NoNeighbour,7838.0,15.374830,51.169534,30.749660
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9201,4902129125,,CODELL,1190.578936,949.853322,8583.0,1393.493883,2462.864733,1325.0,Codeveloped,,NoNeighbour,8613.0,18.513690,15.231737,37.027380
9202,4902129224,,CODELL,,397.429199,9882.0,2758.550901,1364.672738,,NoNeighbour,,NoNeighbour,7631.0,33.791844,12.357013,67.583688
9203,4902129225,,CODELL,,336.475960,9581.0,2531.677278,1290.361340,,NoNeighbour,,NoNeighbour,7592.0,27.725394,14.413422,55.450788
9204,4902129416,,CODELL,,519.208816,9768.0,1536.333026,1992.940418,,NoNeighbour,,NoNeighbour,7783.0,36.654894,35.377252,73.309787


In [21]:
result = ddf.groupby('FormationAlias').mean() #'numeric_only=False' is not implemented in Dask.
result.npartitions 

NotImplementedError: 'numeric_only=False' is not implemented in Dask.

In [22]:
numeric_cols = ddf.select_dtypes(include='number').columns
ddf[numeric_cols].groupby(ddf['FormationAlias']).mean().compute()

Unnamed: 0_level_0,WellID,BVHH,NioGOR,CodGOR,LateralLength,ProppantPerFoot,FluidPerFoot,LeftDistance,RightDistance,TVD,NormalizedOilEUR,NormalizedGasEUR,NormalizedOilEUR power
FormationAlias,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
CODELL,1154323000.0,0.864766,3732.618172,3537.690482,7272.41274,859.162343,892.336873,1325.292715,1354.085865,7474.564045,18.985596,101.8203,37.971193
NIOBRARA,574083500.0,1.086811,5018.081961,4490.776987,6870.006702,1065.527839,1083.44461,732.328125,741.031915,6860.234853,19.049792,107.119895,38.099585


In [23]:
numeric_cols = ddf.select_dtypes(include='number').columns
ddf.groupby('FormationAlias').agg({col: 'var' for col in numeric_cols}).compute()

Unnamed: 0_level_0,WellID,BVHH,NioGOR,CodGOR,LateralLength,ProppantPerFoot,FluidPerFoot,LeftDistance,RightDistance,TVD,NormalizedOilEUR,NormalizedGasEUR,NormalizedOilEUR power
FormationAlias,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
NIOBRARA,-1265199000000000.0,0.144409,203318900.0,11995970.0,7064571.0,303962.990319,353252.088376,735665.9,758518.1,308180.421812,66.371054,4350.173439,265.484215
CODELL,1241200000000000.0,0.079999,20587150.0,7626717.0,7280603.0,179601.100984,495867.820708,1002601.0,1091862.0,314012.78198,69.050295,4474.293621,276.20118


In [24]:
ddf.groupby('FormationAlias').count().compute()

Unnamed: 0_level_0,WellID,BVHH,NioGOR,CodGOR,LateralLength,ProppantPerFoot,FluidPerFoot,LeftDistance,LeftNeighbourType,RightDistance,RightNeighbourType,TVD,NormalizedOilEUR,NormalizedGasEUR,NormalizedOilEUR power
FormationAlias,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
CODELL,2289,1901,2157,2146,2289,2260,2251,1510,2289,1514,2289,2225,2289,2289,2289
NIOBRARA,6917,5664,6552,5392,6917,6714,6691,5248,6917,5264,6917,6668,6917,6917,6917


In [57]:
filtered_df = ddf[ddf['LateralLength'] > 7210].count()
filtered_df.compute()

WellID                4119
BVHH                  2821
FormationAlias        4119
NioGOR                3739
CodGOR                3268
LateralLength         4119
ProppantPerFoot       4098
FluidPerFoot          4089
LeftDistance          2967
LeftNeighbourType     4119
RightDistance         2952
RightNeighbourType    4119
TVD                   3870
NormalizedOilEUR      4119
NormalizedGasEUR      4119
dtype: int64

### Special APIs in Dask

In [25]:
ddf = ddf.map_partitions(lambda df: df['NormalizedGasEUR'] * 2)
ddf.compute() 
# Apply a function to each partition (chunk) of the DataFrame independently.
# very powerful for custom transformations or applying operations that aren't natively supported by Dask.

0        75.348095
1        23.254601
2        29.088712
3        14.024513
4       102.339067
           ...    
9201     30.463474
9202     24.714026
9203     28.826845
9204     70.754505
9205     22.826983
Name: NormalizedGasEUR, Length: 9206, dtype: float64

In [26]:
ddf._meta

Series([], Name: NormalizedGasEUR, dtype: float64)

In [27]:
ddf = ddf.repartition(npartitions=10)

In [28]:
ddf.npartitions

10

### Distributed Computing
Dask can run across multiple machines in a cluster.

In [64]:
pip install dask distributed

Note: you may need to restart the kernel to use updated packages.


In [75]:
from dask.distributed import Client

# Start a local cluster
client = Client()
print(client)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 59458 instead


<Client: 'tcp://127.0.0.1:59461' processes=5 threads=20, memory=15.69 GiB>


In [69]:
pip install dask[distributed]

Note: you may need to restart the kernel to use updated packages.


In [77]:
import dask.dataframe as dd
from dask.distributed import Client
import time
# Connect to the Dask cluster 
client = Client('tcp://127.0.0.1:59461')  # Change 'scheduler-address' as needed
start_time = time.time()
mean_bvhh = ddf['BVHH'].mean().compute()
end_time = time.time()
time_taken = end_time - start_time

print(f"Mean BVHH: {mean_bvhh}")
print(f"Time taken: {time_taken:.4f} seconds")

# shut down the client after computation
client.close()

Mean BVHH: 1.031013138081877
Time taken: 0.7508 seconds


In [79]:
start_time = time.time()
mean_bvh = ddf['BVHH'].mean().compute()
end_time = time.time()
time_taken = end_time - start_time

print(f"Mean BVHH: {mean_bvh}")
print(f"Time taken: {time_taken:.4f} seconds")

Mean BVHH: 1.031013138081877
Time taken: 1.1556 seconds


In [None]:
df_dask_persisted = ddf.persist() # Persist data in memory for faster repeated computations

In [None]:
# Save to Parquet format, which is optimized for performance
combined_df.to_parquet('output.parquet')