In [1]:
import warnings
warnings.filterwarnings("ignore")

from dask.distributed import Client, progress

client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:51183  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 8.00 GB


In [2]:
import dask.dataframe as dd

# assume_missing=True to assume float vars to avoid errors with later df methods
# can also explicitly state var types
df = dd.read_csv('https://tf-assets-prod.s3.amazonaws.com/tf-curric/data-science/creditcard.csv', assume_missing=True)
df.head()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0.0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0.0
1,0.0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,-0.225775,-0.638672,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69,0.0
2,1.0,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0.0
3,1.0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,...,-0.1083,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.5,0.0
4,2.0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,...,-0.009431,0.798278,-0.137458,0.141267,-0.20601,0.502292,0.219422,0.215153,69.99,0.0


In [3]:
# same methods and syntax as panda dataframes, but .compute() must be used bc of lazy evaluation
df[df['Amount'] > 10000].compute()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
46841,42951.0,-23.712839,-42.172688,-13.320825,9.925019,-13.945538,5.564891,15.710644,-2.844253,-1.580725,...,7.9216,-6.32071,-11.310338,0.404175,-4.547278,-1.577118,-2.357385,2.253662,12910.93,0.0
54018,46253.0,-21.780665,-38.30531,-12.122469,9.752791,-12.880794,4.256017,14.785051,-2.818253,-0.667338,...,7.437478,-5.619439,-10.547038,0.653249,-4.232409,-0.480459,-2.257913,2.082488,11898.09,0.0
58465,48401.0,-36.80232,-63.344698,-20.645794,16.715537,-20.672064,7.694002,24.956587,-4.730111,-2.687312,...,11.455313,-10.933144,-17.173665,1.1807,-7.025783,-2.53433,-3.602479,3.450224,19656.53,0.0
30325,95286.0,-34.549296,-60.464618,-21.340854,16.875344,-19.229075,6.335259,24.422716,-4.964566,0.188912,...,11.50258,-9.499423,-16.513186,0.744341,-7.081325,-2.604551,-3.550963,3.250802,18910.0,0.0
48486,119713.0,-20.924897,-37.943452,-14.060281,10.473005,-10.866639,6.256654,14.960521,-2.392155,-0.597076,...,6.82981,-6.926353,-9.928657,-0.447084,-4.848151,-2.24162,-2.140723,2.001492,11789.84,0.0
32983,166198.0,-35.548539,-31.850484,-48.325589,15.304184,-113.743307,73.301626,120.589494,-27.34736,-3.872425,...,-21.62012,5.712303,-1.581098,4.584549,4.554683,3.415636,31.612198,-15.430084,25691.16,0.0
42461,172273.0,-9.030538,-11.112584,-16.233798,3.592021,-40.427726,23.917837,44.054461,-7.277778,-4.210637,...,-0.269048,0.988144,7.040028,0.347693,2.520869,2.342495,3.478175,-2.713136,10199.44,0.0


In [4]:
df2 = df.groupby("Class")["Amount"].mean().compute()
df2

Class
0.0     88.291022
1.0    122.211321
Name: Amount, dtype: float64

In [5]:
df["Amount2"] = df["Amount"].apply(lambda x: 1 if x>10000 else 0, meta=('Amount2', 'int64'))
df.Amount2.head(20)

0     0
1     0
2     0
3     0
4     0
5     0
6     0
7     0
8     0
9     0
10    0
11    0
12    0
13    0
14    0
15    0
16    0
17    0
18    0
19    0
Name: Amount2, dtype: int64

> This type of operation lends itself well to parallelization. In the example above, the apply() function operates row by row, and each operation is independent of the others. Hence, rows can be distributed over several cores, or even over several machines.

---
> So far, you've seen that you can load large amounts of data into Dask DataFrames and do parallel computations on them. However, Dask can be convenient even if you have a sufficient amount of RAM in your computer. If you have the available memory for a dataset, then you can persist the data in memory and take advantage of the memory speed. Dask will still be useful when parallelizing the operations. If you do this, all of the future computations on the persisted DataFrame will be much faster.

> To persist a DataFrame to memory, just call the persist() method of the Dask DataFrames:

In [6]:
df = df.persist()

***Drawbacks of Dask DataFrames***

As you've seen in this checkpoint, working with Dask DataFrames is very similar to working with pandas DataFrames. That being said, you should be aware of the fact that the Dask.dataframe package only covers a small but well-used portion of the pandas API. This because of the following two reasons:

* The pandas API is huge, and Dask still has a way to go to increase its coverage.
* Some operations, like sorting, are hard to do in parallel. Hence, not all of the functionalities of pandas are on the Dask DataFrames.

Additionally, some important operations like set_index() work, but they work slower than in pandas because sorting includes substantial shuffling of the data. Because Dask stores the data in a Dask DataFrame in several pandas DataFrames, it sometimes needs to write out to disk in order to coordinate the sorting operation. This causes overhead.

In [7]:
# client.close()

# Assignment

1. How many transactions are there in total?
2. How many transactions are fraud, and how many are not fraud?
3. What is the maximum amount in fraud transactions?

In [17]:
df.head()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V22,V23,V24,V25,V26,V27,V28,Amount,Class,Amount2
0,0.0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0.0,0
1,0.0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,-0.638672,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69,0.0,0
2,1.0,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0.0,0
3,1.0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,...,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.5,0.0,0
4,2.0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,...,0.798278,-0.137458,0.141267,-0.20601,0.502292,0.219422,0.215153,69.99,0.0,0


In [16]:
# total number of transactions
df.Time.count().compute()

284807

In [20]:
# number of fraud transactions and normal transactions
df.groupby("Class")["Time"].count().compute()

Class
0.0    284315
1.0       492
Name: Time, dtype: int64

In [21]:
# max fraud transaction amount
df.Amount.max().compute()

25691.16