# Porting Existing Code to Distributed Computing

Data scientists run into a scenario where they have port existing Pandas code to Spark or Dask. Either they start with a small data project, and then move it to a larger dataset to run on production, or they have existing code and programs that are struggling to scale. The limitation of Pandas are well documented:

The primary reason is that pandas is single core, and does not take advantage of all available compute resources. A lot of operations also generate [intermediate copies](https://pandas.pydata.org/pandas-docs/stable/user_guide/scale.html#scaling-to-large-datasets) of data, utilizing more memory than necessary. To effectively handle data with pandas, users preferably need to have [5x to 10x times](https://wesmckinney.com/blog/apache-arrow-pandas-internals/) as much RAM as the size of the dataset.

Spark and Dask allow us to split compute jobs across multiple machines. They also can handle datasets that don’t fit into memory by [spilling data](http://distributed.dask.org/en/latest/worker.html#spill-data-to-disk) over to disk in some cases.

## Current Approaches

### Vertical Scaling

The most frequent thing to do is to scale the compute vertically so that there is no re-write of the code needed. Instead of running everything on a 16 GB VM, we can run it on a 32 GB VM. When that isn't good enough anymore, we can run the program on a 64 GB RAM. The problem with this is that it is often not a good use of resources for the following reasons:

1. You likely only need more compute resources for one step out of many. For example, if the dataset is reduced already because Machine Learning Modelling, then you don't need a big VM during the modelling step.
2. Scaling vertically does not automatically mean complete utilization of CPUs. A lot of people frequently scale the underlying virtual machine, but don't introduce parallelism so other cores are not utilized.

### DIY Parallelism

It's also common for people to introduce their own form of parallelism. A common example of this is sharding a CSV or parquet file into several, and then spinning up a Python process for each one by using the multiprocessing library. As an example code snippet:

```python
def logic(file_name):
    shard = pd.read_csv(file_name)
    result = do_something(shard)
    result.to_csv(f"processed-{file_name}")
    return

import concurrent.futures
with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = executor.map(logic, files)
    concurrent.futures.wait(futures)
```

The DIY Parallelism approach can have issues though. The most common issue is resource contention because the memory and CPU consumption of these threads is not fixed. [This is an example StackOverflow post.](https://stackoverflow.com/questions/71151809/python-processpoolexecutor-memory-problems). Basically you have to chunk this yourself. Memory management falls on the user. But how does each process know the overall consumption?

Second, this assumes that everything is on the same compute, but there is room to do things better if we are open to scaling things out.

## Distributed Computing Frameworks

This brings us to distrubted computing frameworks. Distrubted just means we are scaling out over a cluster so the data lives in multiple machines. The [Dask machine learning documentation](https://ml.dask.org/) shows us what the dimensions of scale are. There are compute bound problems and memory bound problems. 

<img src="https://ml.dask.org/_images/dimensions_of_scale.svg" align="middle" width="700"/>


Distributed computing frameworks such as Spark and Dask scale out to a cluster of machines.

<img src="img/spark_dask_ray.png" align="center" width="800"/>

There is an image in the Dask repo [issues](https://github.com/dask/dask/issues/4471) that clearly illustrates the distributed computing paradigm. In general, there is a client or master that takes care of the orchestration and final data collection. The client is responsible for scheduling tasks among workers.

Both Spark and Dask have local modes also where they use the cores available on the local machine. This means we can still take advantage of the additional processing without having a cluster available.

In the diagram below, note how:
- package versions and serialization
- reading in files can be optimized
- data actually lives on a physical machine

<img src="https://user-images.githubusercontent.com/11656932/62263986-bbba2f00-b3e3-11e9-9b5c-8446ba4efcf9.png" align="left" width="700"/>

## Introductions to Partitions

In order to understand partitions, we can look at this image showing the way Dask scales Pandas. Each partition is a Pandas DataFrame. A Dask DataFrame is the collection of all of the Pandas DataFrames. Operations are done on each partition, and then aggregated back.

<img src="https://docs.dask.org/en/latest/_images/dask-dataframe.svg" align="center" width="400"/>

## [Reference on Partitions](https://blog.scottlogic.com/2018/03/22/apache-spark-performance.html) by Scott Logic

This reference has a lot of good images and explanations

### Ideal Partitioning Strategy
![Partitioning](https://blog.scottlogic.com/mdebeneducci/assets/Ideal-Partitioning.png)
### Skewed Partitions
![Skewed Partitions](https://blog.scottlogic.com/mdebeneducci/assets/Skewed-Partitions.png)
### Inefficient Scheduling
![Inefficient Scheduling](https://blog.scottlogic.com/mdebeneducci/assets/Inefficient-Scheduling.png)
### Data Shuffling
![Shuffle](https://blog.scottlogic.com/mdebeneducci/assets/Shuffle-Diagram.png)

## Code Comparison

### Mapping a Dictionary

In [63]:
import pandas as pd
from typing import Dict
import dask.dataframe as dd
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = pd.DataFrame({"id":[0,1,2], "value": (["A", "B", "C"])})
map_dict = {"A": "Apple", "B": "Banana", "C": "Carrot"}

sdf = spark.createDataFrame(df)
ddf = dd.from_pandas(df, npartitions=2)

**Pandas**

In [64]:
df["value"].map(map_dict)

0     Apple
1    Banana
2    Carrot
Name: value, dtype: object

**Spark**

In [67]:
from pyspark.sql.functions import col, create_map, lit
from itertools import chain

mapping_expr = create_map([lit(x) for x in chain(*map_dict.items())])
sdf.withColumn("value", mapping_expr[col("value")]).show()

+---+------+
| id| value|
+---+------+
|  0| Apple|
|  1|Banana|
|  2|Carrot|
+---+------+



**Dask**

In [69]:
ddf["value"].map(map_dict).compute()

0     Apple
1    Banana
2    Carrot
Name: value, dtype: object

### Apply on Columns

In [74]:
df = pd.DataFrame({"col1":[0,1,2], "col2": ([1,2,3])})
sdf = spark.createDataFrame(df)
ddf = dd.from_pandas(df, npartitions=2)

**Pandas**

In [75]:
def add(x):
   return x+3
df.apply(add)

Unnamed: 0,col1,col2
0,3,4
1,4,5
2,5,6


**Spark**

In [83]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def add(x):
   return x+3

spark_udf = udf(add, IntegerType())
sdf = sdf.withColumn('col1',spark_udf("col1")).withColumn('col2',spark_udf("col2"))
sdf.show()

+----+----+
|col1|col2|
+----+----+
|   9|   4|
|  10|   5|
|  11|   6|
+----+----+



**Dask**

In [85]:
try:
    ddf.apply(add, axis=0).compute()
except Exception as e:
    print(e)

dd.DataFrame.apply only supports axis=1
  Try: df.apply(func, axis=1)


**Why can we only apply on rows?**

In [87]:
from time import sleep 

def minmax(x):
    sleep(1)
    return x.max() - x.min()

df = pd.DataFrame({"col1":[0,1,2], "col2": ([1,2,3])})
df.apply(minmax, axis=1)

0    1
1    1
2    1
dtype: int64

In [88]:
df.apply(minmax, axis=0)

col1    2
col2    2
dtype: int64

## Inconsistencies of Pandas and Spark

One of the first issues is the inconsistencies between Pandas and Spark. Below is a summary of differences.

<img src="https://miro.medium.com/max/1400/0*fv0FKyt3jB0ehVrU" align="center" width="600"/>

### Setup

In [89]:
import pandas as pd
from pyspark.sql import SparkSession
import dask.dataframe as dd

spark = SparkSession.builder.getOrCreate()

df = pd.DataFrame({"col1": [None, None, "A", "A", "B", "B"], 
                   "col2": [1,2,3,4,5,6]})
ddf = dd.from_pandas(df, npartitions=2)
sdf = spark.createDataFrame(df)
df

Unnamed: 0,col1,col2
0,,1
1,,2
2,A,3
3,A,4
4,B,5
5,B,6


### Groupby

**Pandas**

In [11]:
df.groupby("col1")["col2"].mean()

col1
A    3.5
B    5.5
Name: col2, dtype: float64

**Dask**

This is consistent with Pandas in dropping the NULL values.

In [9]:
ddf.groupby("col1")["col2"].mean().compute()

col1
A    3.5
B    5.5
Name: col2, dtype: float64

**Spark**

In [6]:
sdf.groupBy("col1").mean("col2").show()

                                                                                

+----+---------+
|col1|avg(col2)|
+----+---------+
|null|      1.5|
|   B|      5.5|
|   A|      3.5|
+----+---------+



**Additional Note**

For those wondering, you can make this consistent with `dropna=False`

In [10]:
df.groupby("col1", dropna=False)["col2"].mean()

col1
A      3.5
B      5.5
NaN    1.5
Name: col2, dtype: float64

### Sorting

**Pandas**

In [90]:
df.sort_values(["col1", "col2"])

Unnamed: 0,col1,col2
2,A,3
3,A,4
4,B,5
5,B,6
0,,1
1,,2


Dask

In [91]:
try:
    ddf.sort_values(["col1", "col2"])
except Exception as e:
    print(e)

Dataframes only support sorting by named columns which must be passed as a string or a list of strings; multi-partition dataframes only support sorting by a single column.
You passed ['col1', 'col2']


Spark

In [92]:
sdf.orderBy(["col1", "col2"]).show()

+----+----+
|col1|col2|
+----+----+
|null|   1|
|null|   2|
|   A|   3|
|   A|   4|
|   B|   5|
|   B|   6|
+----+----+



In [20]:
sdf.orderBy(["col1", "col2"], ascending=[False,True]).show()

+----+----+
|col1|col2|
+----+----+
|   B|   5|
|   B|   6|
|   A|   3|
|   A|   4|
|null|   1|
|null|   2|
+----+----+



**Additional Note**

Pandas has an argument called the `na_position` which lets you decide where to place NA values when sorting. Pandas uses NA first or NA last while Spark uses `None` as smallest value.

In [93]:
df.sort_values(["col1", "col2"], na_position="first")

Unnamed: 0,col1,col2
0,,1
1,,2
2,A,3
3,A,4
4,B,5
5,B,6


## Pitfalls of Distributed Computing

### Inefficient Partitioning

In [94]:
from time import sleep
import numpy as np
import pandas as pd
import dask.dataframe as dd

def delay(df:pd.DataFrame) -> pd.DataFrame:
    sleep(df.shape[0]*3)
    return df.assign(b=df.shape[0])

pdf = pd.DataFrame(range(8), columns=["a"])
pdf

Unnamed: 0,a
0,0
1,1
2,2
3,3
4,4
5,5
6,6
7,7


In [10]:
%%time
ddf = dd.from_pandas(pdf, npartitions=4)
ddf.map_partitions(delay, meta={"a":"int32","b":"int32"}).compute()

CPU times: user 16.2 ms, sys: 5.59 ms, total: 21.8 ms
Wall time: 6.02 s


Unnamed: 0,a,b
0,0,2
1,1,2
2,2,2
3,3,2
4,4,2
5,5,2
6,6,2
7,7,2


In [20]:
%%time
pdf = pd.DataFrame(range(4), columns=["a"])
ddf = dd.from_pandas(pdf, npartitions=4)
ddf.map_partitions(delay, meta={"a":"int32","b":"int32"}).compute()

CPU times: user 13.2 ms, sys: 2.78 ms, total: 16 ms
Wall time: 6.01 s


Unnamed: 0,a,b
0,0,1
1,1,1
2,2,2
3,3,2


### Lineage and Persisting

In [13]:
%%time
def gen_data(df: pd.DataFrame) -> pd.DataFrame:
    sleep(df.shape[0]*3)
    return df.assign(b=np.random.random((df.shape[0], 1)))

pdf = pd.DataFrame([[0],[1],[2],[3],[4],[5],[6],[7]], columns=["a"])
result = gen_data(pdf)
print(result)
print(result.head(3))

   a         b
0  0  0.326501
1  1  0.651844
2  2  0.450825
3  3  0.746542
4  4  0.065295
5  5  0.863471
6  6  0.389724
7  7  0.346177
   a         b
0  0  0.326501
1  1  0.651844
2  2  0.450825
CPU times: user 12.8 ms, sys: 4.94 ms, total: 17.7 ms
Wall time: 24 s


In [15]:
%%time
ddf = dd.from_pandas(pdf, npartitions=4)
result = ddf.map_partitions(gen_data, meta={"a": "int32", "b":"i8"})
print(result.compute())
print(result.head(2))

   a         b
0  0  0.055334
1  1  0.910891
2  2  0.489611
3  3  0.019611
4  4  0.073312
5  5  0.810812
6  6  0.812645
7  7  0.399692
   a         b
0  0  0.276690
1  1  0.846562
CPU times: user 23.4 ms, sys: 5.56 ms, total: 28.9 ms
Wall time: 12 s


### Schema Inference

In [21]:
def add_col(df):
    if df["a"].iloc[0] == 7:
        return df.assign(b=None)
    else:
        return df.assign(b=1)
    
pdf = pd.DataFrame(range(8), columns=["a"])
pdf

Unnamed: 0,a
0,0
1,1
2,2
3,3
4,4
5,5
6,6
7,7


In [22]:
pdf.groupby("a").apply(add_col)

Unnamed: 0,a,b
0,0,1.0
1,1,1.0
2,2,1.0
3,3,1.0
4,4,1.0
5,5,1.0
6,6,1.0
7,7,


In [23]:
pdf.groupby("a").apply(add_col).dtypes

a     int64
b    object
dtype: object

In [24]:
ddf = dd.from_pandas(pdf, npartitions=2)
ddf.groupby("a").apply(add_col).dtypes

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  ddf.groupby("a").apply(add_col).dtypes


a    int64
b    int64
dtype: object

**Double execution time**

In [25]:
%%time
def add_col_2(df):
    if df["a"].iloc[0] == 1:
        sleep(5)
    return df.assign(b=1)

ddf.groupby("a").apply(add_col_2).dtypes

CPU times: user 20.1 ms, sys: 2.36 ms, total: 22.5 ms
Wall time: 5.02 s


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result


a    int64
b    int64
dtype: object

In [26]:
%%time
ddf.groupby("a").apply(add_col_2).compute()

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result


CPU times: user 45.5 ms, sys: 11.2 ms, total: 56.7 ms
Wall time: 10.1 s


Unnamed: 0,a,b
1,1,1
4,4,1
5,5,1
6,6,1
7,7,1
0,0,1
2,2,1
3,3,1


## Pandas-like Frameworks

Pandas-like frameworks offer us the promise of changing the import statement to parallelize our code.

* [Koalas](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html) is a way to use the Spark engine with the Pandas interface. This was renamed to PySpark Pandas in PySpark 3.2
* [Modin](https://modin.readthedocs.io/en/stable/) is a way to use Dask or Ray with the Pandas interface

<img src="img/modin_spark.png" align="center" width="800"/>



## Behavior Differences of Pandas-like Frameworks

This is the full code to replicate on Kaggle:

https://www.kaggle.com/code/kvnkho/fugue-is-not-another-pandas-like-framework/edit

### Benchmark Data

We created a DataFrame with the following structure. Columns a and b are string columns. Columns c and d are numerical values. This DataFrame will have 1 million rows (but we will also change it in some cases).

We will create this DataFrame in Pandas, Modin (on Ray), PySpark Pandas, and Dask. For each backend, we will time the operations of different cases. This should be clearer after the first issue is discussed.

<img src="https://miro.medium.com/max/1248/0*K-OSXQShdehsyGN3" align="center" width="600"/>

### Index Location

```python
# case 1
df.head(10)[["c","d"]]

# case 2
df.tail(10)[["c","d"]]

# case 3
df.iloc[:10, [2,3]]

# case 4
df.iloc[-10:, [2,3]]

# case 5
df.iloc[499995:500005, [2,3]]
```

<img src="https://miro.medium.com/max/1400/1*p-cxxzzIVvIrpJvB-wlGJQ.png" align="center" width="600"/>

### Pandas Assumes Shuffle is Cheap

In a distributed setting, data lives on multiple machines. Sometimes, data needs to be rearranged across machines so that each worker has all the data belonging to a logical group. This movement of data is called a shuffle and is an inevitable, but expensive part of working with distributed computing.

Take the two equivalent operations. The goal is to keep the row with the highest value of c for each value of d. Note a groupby-max does not preserve the whole row. Case 1 performs a global sort and then drops duplicates to keep the last row. Case 2 on the other hand uses a groupby-idxmax operation to keep the maximum row. Then the smaller DataFrame is merged back to the original DataFrame. This benchmark used 100k rows instead of 1 million.

```python
# case 1: more shuffle
df.sort_values(["c","d"]).drop_duplicates(subset=["d"], keep="last")

# case 2: less shuffle
idx = df.groupby("d")["c"].idxmax()
df.merge(idx, left_index=True, right_on="c")
```

<img src="https://miro.medium.com/max/1176/1*R6MswImUP_aGS0ZYmuCIDQ.png" align="center" width="600"/>

### Pandas Assumes the Index is Beneficial

One of the core concepts ingrained in the Pandas mindset is the index. If a user comes from a Pandas background, they assume that the index is beneficial and it’s worth setting or resetting it. Let’s see how this translates to other backends.

Take the code snippet below. We filter for a given group and then calculate the sum of those records. Case 1 has no index, and case 2 uses an index.

```python
# case 1: without index
df[df["a"]=="red"]["c"].sum()

# case 2: with "a" as index
idf = df.set_index("a")
idf.loc["red"]["c"].sum()
```

<img src="https://miro.medium.com/max/1340/1*DKfyar9dPMPvYMFV0KjhBA.png" align="center" width="600"/>

### Eager versus Lazy Evaluation (Part One)

Lazy evaluation is a key feature of distributed computing frameworks. When calling operations on a DataFrame, a computation graph is constructed. The operations only happen when an action is performed that needs the data.

```python
# case 1: read file and min of all columns 
backend.read_parquet(path).min()

# case 2: read file and min of two columns 
backend.read_parquet(path)[["c0","c1"]].min()
```

<img src="https://miro.medium.com/max/1340/1*9nEbwyjj3zeH-_e6rjSUbA.png" align="center" width="600"/>

### Eager versus Lazy Evaluation (Part Two)

Here, we see a case where eager evaluation helps users. But when practitioners don’t understand lazy evaluation, it also becomes very easy to run into duplicated work.

See the following cases, Case 1 just gets the min of two columns while Case 2 gets the min, max, and mean.

```python
# case 1: min of 2 columns
sub = backend.read_parquet(path)[["c0","c1"]]
sub.min()

# case 2: min, max, and mean of 2 columns
sub = backend.read_parquet(path)[["c0","c1"]]
sub.min()
sub.max()
sub.mean()
```

<img src="https://miro.medium.com/max/1208/1*4WzV-AxMry102o22PuiGkQ.png" align="center" width="600"/>

## Does the import magic hold up?

In [33]:
import pandas as pd
import numpy as np 
import pyspark.pandas as ks

### Interoperability with other libraries

There is [this Stackoverflow post](https://stackoverflow.com/questions/70510056/use-of-koalas-instead-of-pandas/70530911#70530911) about someone asking how to do `np.where` in Pandas.

**Pandas**

In [50]:
df = pd.DataFrame({"a": [1,2,3], "b": [1,2,3]})
df["c"] = np.where(df['a'] > 2, 1, 0)
df

Unnamed: 0,a,b,c
0,1,1,0
1,2,2,0
2,3,3,1


**Koalas**

In [36]:
df = ks.DataFrame({"a": [1,2,3], "b": [1,2,3]})

try: 
    df["c"] = np.where(df['a'] > 2, 1, 0)
except Exception as e:
    print(e)

The method `pd.Series.__iter__()` is not implemented. If you want to collect your data as an NumPy array, use 'to_numpy()' instead.


**Koalas with helper function**

In [49]:
df = ks.DataFrame({"a": [1,2,3], "b": [1,2,3]})

# your custom function
def numpy_where(s, cond, action1, action2):
  return s.where(cond, action2).where(~cond, action1)

# create sample new column
df['c'] = numpy_where(df['a'], df['a'] > 2, 1, 0)
df

Unnamed: 0,a,b,c
0,1,1,0
1,2,2,0
2,3,3,1


### API Parity of Koalas and Modin

**Pandas**

In [52]:
df = pd.DataFrame({'a':[1,2,3],'b':[3,4,5]})
df.assign(c=pd.Series([1,2,3]))

Unnamed: 0,a,b,c
0,1,3,1
1,2,4,2
2,3,5,3


**Koalas**

Koalas can't use assign this way as documented [here](https://koalas.readthedocs.io/en/latest/reference/api/databricks.koalas.DataFrame.assign.html)

In [59]:
df = ks.DataFrame({'a':[1,2,3],'b':[3,4,5]})
try:
    df.assign(c=ks.Series([1,2,3]))
except Exception as e:
    print(type(Exception))

**Modin and Default to Pandas**

https://modin.readthedocs.io/en/stable/supported_apis/defaulting_to_pandas.html

## Introducing Fugue