# Modin
### 1. What is Modin 
Nowadays, information retrieval in industry usually deals with data with high dimension and large scale. Traditionally, the classic library `pandas` would be used to process the data, however, it has been a pain to many researchers that when the size of datasets reaches TB, the `pandas` would become unbearably slow. This may result from the fact `pandas` is designed as single-thread. If you consider how many cores there are in 1 CPU, you will find it is a huge waste of resources. Therefore `Modin` is developed to address this situation.
> Modin is an early-stage project at [UC Berkeley's RISELab](https://rise.cs.berkeley.edu/) designed to facilitate the use of distributed computing for Data Science. It is a multipprocess Dataframe library with an **identical API to pandas** that allows users to speed up their Pandas workflows.

### 2. Pandas vs. Modin
The `modin.pandas` DataFrame is an extremely light-weight parallel DataFrame. In `pandas` you can use only one core at a time. `Modin` currently is using `Ray` and `Dask` as computation engine in the backend to achieve parallel computing. 
The `Modin` utilized the simple Mapreduce concept to achieve such a goal. To implement, the `Modin` DataFrame architecture follows in the footsteps of modern architectures for database and high performance matrix systems. We choose schema that partitions along both columns and rows because it gives `Modin` flexibility and scalability in both the number of columns and the number of rows supported. The following figure illustrates this concept. ModinDataFrame would be partitioned both in rows and columns and distributed to different CPU.
In a 4-core machine, the speed would be 4 time faster with `read_csv` function.
![read_csv test](https://modin.readthedocs.io/en/latest/_images/read_csv_benchmark.png)

### 3. Architecture
#### 3.1 DataFrame Partitioning
`Modin` partitions data along both columns and rows because it gives `Modin` flexibility and scalability in both the number of columns and the number of rows supported. For example, for a skewed dataset with few rows but many columns, most existing libraries which partition along columns would find it tricky to partition such data. ![comparision between pandas and modin](https://miro.medium.com/max/1000/0*GYnWlFf7QyueGWEm.png)

#### 3.2 System Architecture
`Modin` is separated into different layers.`Pandas` API is exposed at the topmost layer. Then it is a layer called Query Compiler which receives queries from the `pandas` API layer and optimize them in a certain way. At the bottom is the partition manager and is responsible for the data layout and shuffling, partitioning and serializing the tasks. ![architecture of Modin](https://miro.medium.com/max/753/0*cOcYOJ3Lib0_04Ji.png)


### 4. How to use Modin
#### 4.1 Installation

In [29]:
# using these code to install modin and computation engine ray and dask
! pip install numpy --upgrade 
! pip install modin
! pip install modin[dask]

Requirement already up-to-date: numpy in e:\anaconda\lib\site-packages (1.18.2)


In [35]:
! python -m pip install dask[dataframe]

Collecting fsspec>=0.6.0; extra == "dataframe"
  Downloading fsspec-0.6.3-py3-none-any.whl (65 kB)
Collecting partd>=0.3.10; extra == "dataframe"
  Downloading partd-1.1.0-py3-none-any.whl (19 kB)
Installing collected packages: fsspec, partd
  Attempting uninstall: partd
    Found existing installation: partd 0.3.8
    Uninstalling partd-0.3.8:
      Successfully uninstalled partd-0.3.8
Successfully installed fsspec-0.6.3 partd-1.1.0


#### 4.2 Using Modin

In [36]:
import dask.dataframe as pd_modin
import pandas as pd

Now we are ready to use `Pandas on Ray` just as we use `Pandas`. I am testing  with a 143MB [decompressed file](http://www.kaggle.com/camnugent/sandp500/data on my laptop with 4 physical cores and 8GB Memor. 

In [44]:
%%time
df_modin = pd_modin.read_csv("all_stocks_5yr.csv")

Wall time: 16 ms


In [63]:
print(df_modin.head())

         date   open   high    low  close    volume Name
0  2013-02-08  15.07  15.12  14.63  14.75   8407500  AAL
1  2013-02-11  14.89  15.01  14.26  14.46   8882000  AAL
2  2013-02-12  14.45  14.51  14.10  14.27   8126000  AAL
3  2013-02-13  14.30  14.94  14.25  14.66  10259500  AAL
4  2013-02-14  14.94  14.96  13.16  13.99  31879900  AAL


In [39]:
type(df_modin)

dask.dataframe.core.DataFrame

In [48]:
df_modin.columns

Index(['date', 'open', 'high', 'low', 'close', 'volume', 'Name'], dtype='object')

We can start by doing some simple query like how many days ended with positive gains.

In [47]:
%%time
pos_df = df_modin.query("close > open")
print(pos_df['date'].head(n=10))
print("\nNumber of positive days:", pos_df.size)

3     2013-02-13
5     2013-02-15
11    2013-02-26
12    2013-02-27
14    2013-03-01
15    2013-03-04
16    2013-03-05
17    2013-03-06
18    2013-03-07
20    2013-03-11
Name: date, dtype: object

Number of positive days: dd.Scalar<size-ag..., dtype=int32>
Wall time: 449 ms


In [56]:
%%time
count_by_Time = df_modin.groupby(by="date").count()

Wall time: 13.9 ms


In [57]:
count_by_Time.head()

Unnamed: 0_level_0,open,high,low,close,volume,Name
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2013-02-08,476,476,476,476,476,476
2013-02-11,476,476,476,476,476,476
2013-02-12,476,476,476,476,476,476
2013-02-13,476,476,476,476,476,476
2013-02-14,476,476,476,476,476,476


We can see that `Modin` has integrated `pandas` API that is commonly used. 

Next I am gonna compare `Modin`with a dask engine and raw `pandas`. Considering that `Modin` is now working on full `Pandas` feature API, here I present some commonly used feature. The dataset I am using can be found [here](https://www.kaggle.com/wendykan/lending-club-loan-data)(1.1GB)

In [64]:
import pandas as old_pd

First we ill compare how much time it consumes to load a CSV file between using a traditional `pandas` and `Modin`.

In [73]:
# Modin with dask engine
print("Modin with dask engine: ")
%time pandas_on_modin = pd_modin.read_csv("loan.csv")

# raw Pandas
print("\n Pandas")
%time pandas_raw = pd.read_csv("loan.csv")

Modin with dask engine: 
Wall time: 285 ms

 Pandas




Wall time: 44.1 s


In [81]:
print(pandas_raw.head())

   id  member_id  loan_amnt  funded_amnt  funded_amnt_inv        term  \
0 NaN        NaN       2500         2500           2500.0   36 months   
1 NaN        NaN      30000        30000          30000.0   60 months   
2 NaN        NaN       5000         5000           5000.0   36 months   
3 NaN        NaN       4000         4000           4000.0   36 months   
4 NaN        NaN      30000        30000          30000.0   60 months   

   int_rate  installment grade sub_grade  ... hardship_payoff_balance_amount  \
0     13.56        84.92     C        C1  ...                            NaN   
1     18.94       777.23     D        D2  ...                            NaN   
2     17.97       180.69     D        D1  ...                            NaN   
3     18.94       146.51     D        D2  ...                            NaN   
4     16.14       731.78     C        C4  ...                            NaN   

  hardship_last_payment_amount disbursement_method  debt_settlement_flag  \
0   

We can see `Modin` with dask engine is about 266 times faster than raw `pandas`. This is quite impressive. As I have demonstrated before, `Modin` employs data partitioners to partition data along both rows and columns, and therefore it can use multiple CPU cores to complete the task. To prove this argument, we can collect all the data and see how long this may take.

In [71]:
# Modin with dask engine
print("Modin with dask engine: ")
%time entire_df_mod = pandas_on_modin[:]

# raw Pandas
print("\n Pandas")
%time entire_df_raw = pandas_raw[:]

Modin with dask engine: 
Wall time: 2.99 ms

 Pandas
Wall time: 0 ns


We can find that `Modin` with dask engine is way slower than raw `pandas`. By using the operator `[:]`, we are collecting all the data. Due to parallization, with `Modin` with dask engine, all the threads are running in-parallel to read the file and serialize the data in different place. When we have to collect the data, there is only one thread deserializing the data which will make it the bottleneck. Next, we can check it out indexing the file.

In [79]:
# Modin with dask engine
print("Modin with dask engine: ")
%time pandas_on_modin.index

# raw Pandas
print("\n Pandas")
%time pandas_raw.index

Modin with dask engine: 
Wall time: 0 ns

 Pandas
Wall time: 0 ns


RangeIndex(start=0, stop=2260668, step=1)

Next, let's see how `Pandas` and `Modin` performs in query processing.

In [82]:
# Modin with dask engine
print("Modin with dask engine: ")
%timeit q0 =  pandas_on_modin.query("int_rate > 15")

# raw Pandas
print("\n Pandas")
%timeit q1 = pandas_raw.query("int_rate > 15")

Modin with dask engine: 
21.1 ms ± 1.18 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

 Pandas
The slowest run took 6.05 times longer than the fastest. This could mean that an intermediate result is being cached.
1.51 s ± 1.27 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


We can in this `timeit` call, `Modin` is about 5 times faster than raw `pandas`.

### 5. Reference
* [RAY vs DASK](https://gist.github.com/devin-petersohn/f424d9fb5579a96507c709a36d487f24#file-pandas_on_ray_blog_post_0-ipynb)
* [Modin Tutorial](https://modin.readthedocs.io/en/latest/using_modin.html#using-modin-on-a-single-node)
* [Modin](https://www.zhihu.com/question/24590883)

In [None]:
#### 4.2 Using Modin