<div style="color:#006666; padding:0px 10px; border-radius:5px; font-size:18px;"><h1 style='margin:10px 5px'>Making Pandas Faster</h1>
</div>

© Copyright Machine Learning Plus

<div class="alert alert-info" style="background-color:#006666; color:white; padding:0px 10px; border-radius:5px;"><h2 style='margin:10px 5px'>1. Numba</h2>
</div>

Numba is an open source JIT compiler that translates a subset of Python and NumPy code into fast machine code.

__When to use__

Numba is best suited in following use cases:
- Mathematical operations
- NumPy codes
- Multiple loops

__Why numba__

You still have the comfort of writing the codes in python

__How does Numba work__

Numba reads the Python bytecode for a decorated function and combines this with information about the types of the input arguments to the function. It analyzes and optimizes your code, and finally uses the LLVM compiler library to generate a machine code version of your function, tailored to your CPU capabilities. This compiled version is then used every time your function is called. - __numba doc__

In [None]:
# Install numba
# !pip install numba==0.53.1

In [None]:
# Import libraries
import numba
from numba import jit
print(numba.__version__)

__Task__

Get the list of prime numbers from 0 to 100.

__Without numba__

In [None]:
# Import libraries
import numpy as np
import time

In [None]:
# Define function to extract prime numbers from a given list using simple python
def check_prime(y):
    prime_numbers = []
    for num in y:
        flag = False
        if num > 1:
            # check for factors
            for i in range(2, num):
                if (num % i) == 0:
                    # if factor is found, set flag to True
                    flag = True
                    # break out of loop
                    break
            if flag == False:
                prime_numbers.append(num)
                
    return prime_numbers

Let's calculate the time taken to perform the task

In [None]:
# List of 0 to 100
x = np.arange(100)
x

# DO NOT REPORT THIS... COMPILATION TIME IS INCLUDED IN THE EXECUTION TIME!
start = time.time()
check_prime(x)
end = time.time()
print("Elapsed (with compilation) = %s" % (end - start))


# NOW THE FUNCTION IS COMPILED, RE-TIME IT EXECUTING FROM CACHE
start = time.time()
check_prime(x)
end = time.time()
print("Elapsed (after compilation) = %s" % (end - start))

In a notebook, the %timeit magic function is the best to use because it runs the function many times in a loop to get a more accurate estimate of the execution time of short functions.

In [None]:
%timeit check_prime(x)

__With numba. Numba’s JIT decorator__

To benchmark Numba-compiled functions, it is important to time them without including the compilation step, since the compilation of a given function will only happen once for each set of input types, but the function will be called many times.

In [None]:
# Define function to extract prime numbers from a given list using jit decorator and nopython = True mode

@jit(nopython=True)
def check_prime(y):
    prime_numbers = []
    for num in y:
        flag = False
        if num > 1:
            # check for factors
            for i in range(2, num):
                if (num % i) == 0:
                    # if factor is found, set flag to True
                    flag = True
                    # break out of loop
                    break
            if flag == False:
                prime_numbers.append(num)
                
    return np.array(prime_numbers)

Let's calculate the time taken to perform the task

In [None]:
# DO NOT REPORT THIS... COMPILATION TIME IS INCLUDED IN THE EXECUTION TIME!
start = time.time()
check_prime(x)
end = time.time()
print("Elapsed (with compilation) = %s" % (end - start))


# NOW THE FUNCTION IS COMPILED, RE-TIME IT EXECUTING FROM CACHE
start = time.time()
check_prime(x)
end = time.time()
print("Elapsed (after compilation) = %s" % (end - start))

In a notebook, the %timeit magic function is the best to use because it runs the function many times in a loop to get a more accurate estimate of the execution time of short functions.

In [None]:
%timeit check_prime(x)

The computations are very fast using numba. 

__What is nopython mode__

When you set nopython = True, you are asking the compiler to not use the python interpreter at all. This is the recommended and best-practice way to use the Numba jit decorator as it leads to the best performance.

### Define the function signature to gain more speed

Numba provides various datatypes so you can define the function signature, that is, what is the datatypes of the input and the returned output. You can define an array as well. Often this helps to achieve a small speed gain.

In [None]:
from numba import int32

In [None]:
# Define function to extract prime numbers from a given list using jit decorator and nopython = True mode
@jit(int32[:](int32[:]), nopython=True)
def check_prime(y):
    prime_numbers = []
    for num in y:
        flag = False
        if num > 1:
            # check for factors
            for i in range(2, num):
                if (num % i) == 0:
                    # if factor is found, set flag to True
                    flag = True
                    # break out of loop
                    break
            if flag == False:
                prime_numbers.append(num)
    return np.array(prime_numbers)

In [None]:
x = np.arange(100)

In [None]:
%timeit check_prime(x)

<div class="alert alert-info" style="background-color:#006666; color:white; padding:0px 10px; border-radius:5px;"><h2 style='margin:10px 5px'>2. Dask </h2>
</div>

Dask provides efficient parallelization for data analytics in python. Dask Dataframes allows you to work with large datasets for both data manipulation and building ML models with only minimal code changes. It is open source and works well with python libraries like NumPy, scikit-learn, etc.

__Why Dask__

Pandas is not sufficient when the data gets bigger, bigger than what you can fit in the RAM.

You may use Spark or Hadoop to solve this. But, these are not python environments. This stops you from using numpy, sklearn, pandas, tensorflow, and all the commonly used Python libraries for ML.

It scale up to clusters

This is where Dask comes to the rescue 

__Install Dask and graphviz__

In [None]:
# !pip install dask==2021.05.0
# !pip install graphviz==0.16
# !conda install python-graphviz

[Known problems installing graphviz on windows](https://stackoverflow.com/questions/35064304/runtimeerror-make-sure-the-graphviz-executables-are-on-your-systems-path-aft)

In [None]:
# Import dask
import dask
print(dask.__version__)

__Parallel Processing with Dask__

__Task__

- Apply a discount of 20% to 2 products worth 100 and 200 respectively and generate a total bill

__Function without dask__

Let's define the functions. Since the task is very small, I am adding up sleep time of 1 second in every function. 

In [None]:
from time import sleep

# Define functions to apply discount, get the total of 2 products, get the final price of 2 products
def apply_discount(x):
    sleep(1)
    x=x-0.2*x
    return x

def get_total_price(a,b):
    sleep(1)
    return a+b

Let's calculate the total bill and note down the time taken for the task. I am using %%time function to note the time

In [None]:
%%time

product1 = apply_discount(100)
product2 = apply_discount(200)
total_bill = get_total_price(product1, product2)

Total time taken for the above task is 4.01s. Let's use dask and check the time taken

__Function with dask__

Use delayed function from dask to reduce the time

In [None]:
# Import dask.delayed
from dask import delayed

In [None]:
%%time

# Wrapping the function calls using dask.delayed
product1 = delayed(apply_discount)(100)                            # no work has happened yet
product2 = delayed(apply_discount)(200)                            # no work has happened yet
total_bill = delayed(get_total_price)(product1, product2)          # no work has happened yet

In [None]:
total_bill

As you can see the total time taken with delayed wrapper is only 374 µs. But the work hasn't happened yet. Delayed wrapper creates a delayed object, that keeps track of all the functions to call and the arguments to pass to it. Basically, it has built a task graph that explains the entire computation. You don't have the output yet.

Most Dask workloads are lazy, that is, they don’t start any work until you explicitly trigger them with a call to `compute()`.

So let's use `compute()` to get the output

In [None]:
total_bill.compute()

Now you have the output. This operation also took some time. Let's compute the total time taken. 

In [None]:
%%time

# Wrapping the function calls using dask.delayed
product1 = delayed(apply_discount)(100)
product2 = delayed(apply_discount)(200)
total_bill = delayed(get_total_price)(product1, product2)

total_bill.compute()

The total time taken is 2.01 seconds. It's 1 second less than the original functions. Any idea how did dask.delayed do this?

You can see the optimal task graph created by dask by calling the visualize() function. Let's see. 

In [None]:
# Visualize the total_bill object
total_bill.visualize()

Clearly from the above image, you can see there are two instances of apply_discount() function called in parallel. This is an opportunity to save time and processing power by executing them simultaneously.

This was one of the most basic use case of dask. Let's look at another use case

__Parallel Processing in a loop with dask__

You can use dask for parallel processing inside a loop. Let's understand it using an example

Task:
- Add the squares and doubles of all the numbers from 0 to 5.

In [None]:
# Functions to perform mathematics operations
def square(x):
    return x*x

def double(x):
    return x*2

def add(x, y):
    return x + y

# For loop that calls the above functions for each data
output = []
for i in range(6):
    a = delayed(square)(i)
    b = delayed(double)(i)
    c = delayed(add)(a, b)
    output.append(c)

total = dask.delayed(sum)(output)
print(total.compute())

Let's visualize the task graph now

In [None]:
# Visualizing the task graph for the problem
total.visualize()

This was one way of using dask.delayed. Another way to use it is use it as a decorator while defining the functions. 

In [None]:
# Using delayed as a decorator to achieve parallel computing.

@delayed
def square(x):
    return x*x

@delayed
def double(x):
    return x*2

@delayed
def add(x, y):
    return x + y

# No change has to be done in function calls
output = []
for i in range(6):
    a = square(i)
    b = double(i)
    c = add(a, b)
    output.append(c)

total = dask.delayed(sum)(output)
total.visualize()

You have a good understanding of how to parallelize tasks using dask. But what if you have huge dataset and you can't load it in python. Dask dataframes to the rescue

You can simply import the dataset as dask.dataframe instead. And if you want you can convert it to a pandas dataframe after necessary wrangling/calculations are done.

### How is dask.dataframe different from pandas.dataframe?

A Dask DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index. One Dask DataFrame is comprised of many in-memory pandas DataFrames separated along with the index.

These Pandas DataFrames may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster. One Dask DataFrame operation triggers many operations on the constituent Pandas DataFrames.

<img src="https://www.machinelearningplus.com/wp-content/uploads/2020/11/dask-df.png" alt="Dask Dataframe">

Without wasting much time, let's start with the implementation of dask dataframes

First let's import dask.dataframe as dd

In [None]:
# Import dask dataframe as dd
import dask
import dask.dataframe as dd

Here I am using an inbuild data, if you wish to import any dataset from local machine, use `dd.read_csv()`. It's similar to pandas read_csv()

In [None]:
data_frame = dd.read_csv("datasets/Titanic.csv")
data_frame

In [None]:
type(data_frame)

I have used data_frame object in the last line of the code, pandas would have printed the dataframe. But here you can see that only the structure is there, no data has been printed. It’s because Dask Dataframes are lazy and do not perform operations unless necessary. You can use the head() method to visualize data

In [None]:
data_frame.head()

In [None]:
data_frame

__Task__
- Get the maximum value of Fare column

In [None]:
# Use max function to get the maximum value of column y
max_value = data_frame.Fare.max()
max_value

As I said earlier, Dask is lazy, use `compute()` function 

In [None]:
max_value.compute()

Let's visualize it as well. You will see the level of parallel processing it has done

In [None]:
# Visualize the max_value
max_value.visualize()

__Task__
- Get the standard deviation of column y grouped by the name column

In [None]:
# Applying groupby operation
df = data_frame.groupby('Survived').Fare.std()
df

In [None]:
# Compute the values
df.compute()

Let's visualize it as well. You will see the level of parallel processing it has done

In [None]:
# Visualize the max_value
df.visualize()

<div class="alert alert-info" style="background-color:#006666; color:white; padding:0px 10px; border-radius:5px;"><h2 style='margin:10px 5px'>3. Modin</h2>
</div>

__Modin : Scale your pandas workflow by changing a single line of code__

Modin is a python library that can be used to handle large datasets using parallelisation. It uses Ray or Dask to provide an effortless way to speed up the operations

The syntax is similar to pandas and its astounding performance has made it a promising solution. All you have to do is change just one line of code.

#### Why do you need modin?

Pandas works very well when you work with a smaller dataset which fits in the RAM. But, in practice, you will have to work on huge datsets (size of several GBs or larger). In such cases, pandas may not cut it. Pandas is designed to work only on a single core. Even though most of our machines have multiple CPU cores, pandas doesn't use the multi-cores available.

In such cases Modin comes in. You don’t need a new set of syntax to start using Modin. It is capable of speeding up your pandas scripts up to 4x.

Let's get started and install modin

In [None]:
# Install Modin dependencies and Dask to run on Dask

# !pip install -U pandas
# !pip install modin[dask]
# !pip install "dask[distributed]"

Modin also allows you to choose which engine you wish to use for computation. The environment variable MODIN_ENGINE is used for this. The below code shows how to specify the computation engine

In [None]:
import os
# os.environ["MODIN_ENGINE"] = "ray"  # Modin will use Ray
os.environ["MODIN_ENGINE"] = "dask"  # Modin will use Dask

By default, Modin will use all of the cores available on your system. 

But if you wish to limit the cores as you wish to do some other tast, you can limilt the number of CPUs modin uses with the below command

In [None]:
import os
os.environ["MODIN_CPUS"] = "4"
import modin.pandas as pd

Let's look at the implementation of modin

__Task__
- Load the dataset using pandas and modin, compare the total time taken

I am going to use pandas_pd for pandas and modin_pd for modin. First let's load load the data using pandas

In [None]:
# Load Pandas and time
import pandas as pandas_pd
import time

In [None]:
# Load csv file using pandas
%time  pandas_df = pandas_pd.read_csv("Datasets/large_dataset.csv")

Initialize dask client

In [None]:
# For Dask backend
from dask.distributed import Client
client = Client()

In [None]:
# Load csv file using modin pd
%time  modin_df = pd.read_csv("Datasets/large_dataset.csv")

The total time taken is 1 min 10 seconds. The time taken might vary as per the specification of your system. We were able to save few seconds only for such a small task. Image how much time we can save while working on bigger dataset and lot of computations

Let's perform few tasks using modin

__Task__
- Print the head of both dataframes and compare the time

In [None]:
# pandas df group by
%time pandas_df.groupby(['EngineVersion']).count()

In [None]:
# modin df group by
%time modin_df.groupby(['EngineVersion']).count()

The time taken by modin df is lesser than the pandas df. Let's look at another task

__Task__
- Print the head of both dataframes and compare the time

In [None]:
# pandas df fillna
%time pandas_df.fillna(0)

In [None]:
# modin df fillna
%time modin_df.fillna(0)

The time taken by modin df is much lesser than the pandas df

As I said earlier, to use modin all you have to do is just change a single line of code. i.e use modin dataframe rather than pandas dataframe

<div class="alert alert-info" style="background-color:#006666; color:white; padding:0px 10px; border-radius:5px;"><h2 style='margin:10px 5px'>4. Swifter</h2>
</div>

Swifter package efficiently applies any function to a pandas dataframe or series in the fastest available manner.  It is integrated with Pandas and modin.

`Apply` function itself is very fast as compared to other functions. But once it gets integrated with swifter it boosts up the speed like anything. 

It is advised to used vectorised functions while working with swifter

__Task__: 

In Titanic data, Create a new column that contains the score by appliying mathematical computations on Fare column

In [None]:
# !pip install -U swifter==1.0.7

In [None]:
# Import the package
import numpy as np
import pandas as pd
import swifter
print(swifter.__version__)

In [None]:
import modin.pandas as mpd
swifter.register_modin()

In [None]:
np.random.seed(100)
df = pd.DataFrame(np.random.randint(1, 1000, size=(1_000_000, 6)), columns=['A', 'B', 'C', 'D', 'E', 'F'])
df.head()

__Without Swifter__

In [None]:
%%time 
df['score'] = df.apply(lambda x: x.max()-x.min(), axis=1)

In [None]:
df.head()

Let's see how much time swifter takes

__With Swifter__

While importing the Swifter package, it automatically got integrated with Pandas package. Now it can be used with the functional attribute from Pandas such as apply

In [None]:
%%time 
df['score'] = df.swifter.apply(lambda x: x.max()-x.min(), axis=1)

In [None]:
df.head()

__Turn off progress bar__

In [None]:
%%time 
df['score'] = df.swifter.progress_bar(False).apply(lambda x: x.max()-x.min(), axis=1)

In [None]:
df.head()

It is advised to use swifter package with vectorised functions. Let's see the execution time with and without vectorized functions

### Non vectorised function

__Task__: 

The govt plans to refund the ticket price of the passengers in titanic. The passengers who died will get 100x fare while the passengers who survived will get 10x fare. Compute a fare refund column in the dataset

In [None]:
# Load the dataset
df = pd.read_csv('Datasets/Titanic.csv')

In [None]:
df.head()

In [None]:
# Define the function to calculate the refund fare
def fare_refund(x):
    if x['Survived'] == 0:
        return x['Fare'] * 100
    else:
        return x['Fare'] * 10

In [None]:
# Use pandas apply
%time df['Fare_refund'] = df[['Survived','Fare']].apply(fare_refund, axis =1)

In [None]:
# Use swifter apply
%time df['Fare_refund'] = df[['Survived','Fare']].swifter.apply(fare_refund, axis =1)

__Note:__ 

The time taken by swifter package is higher as compared to pandas. It's because non vectorized swifter function implements the dask parallel processing. It doesn't rely on the swifter processing itself. 

Let's see by changing the function to vectorized function using numpy

In [None]:
import numpy as np

def fare_refund_vectorized(x):
    return np.where(x['Survived'] ==0, x['Fare'] * 100, x['Fare'] * 10)

In [None]:
# Use pandas apply
%time df['Fare_refund'] = df[['Survived','Fare']].apply(fare_refund_vectorized, axis =1)

In [None]:
# Use swifter apply
%time df['Fare_refund'] = df[['Survived','Fare']].swifter.apply(fare_refund_vectorized, axis =1)

__Note:__ 

With vectorized functions, swifter is way fast as compared to pandas functions. So it's advised to use vectorized functions with swifter  

__Important Notes__ - latest documentation, might change later

- Please upgrade your version of pandas, as the pandas extension api used in this module is a recent addition to pandas.
- Do not use swifter to apply a function that modifies external variables. Under the hood, swifter does sample applies to optimize performance. These sample applies will modify the external variable in addition to the final apply. Thus, you will end up with an erroneously modified external variable.
- It is advised to disable the progress bar if calling swifter from a forked process as the progress bar may get confused between various multiprocessing modules.

<div class="alert alert-info" style="background-color:#006666; color:white; padding:0px 10px; border-radius:5px;"><h2 style='margin:10px 5px'>5. Vaex</h2>
</div>

Vaex is a high performance Python library for lazy Out-of-Core DataFrames (similar to Pandas), to visualize and explore big tabular datasets. 

Vaex uses __memory mapping__, __zero memory copy policy__ and __lazy computations__ for best performance (no memory wasted).

__Why vaex__

1. Performance: works with huge tabular data, processes >109 rows/second
2. Lazy / Virtual columns: compute on the fly, without wasting ram
3. Memory efficient no memory copies when doing filtering/selections/subsets.
4. Visualization: directly supported, a one-liner is often enough.
5. User friendly API: you will only need to deal with the DataFrame object, and tab completion + docstring will help you out, feels very similar to Pandas.

Jupyter integration: vaex-jupyter will give you interactive visualization and selection in the Jupyter notebook and Jupyter lab.



__Task__

Load Trips data and do various data wrangling operations

__Using Pandas__

IPython >= 7 is required for Vaex

In [None]:
# !pip install vaex
# !pip install IPython==7.0.0

In [None]:
# Import libraries
import vaex
import pandas as pd
import numpy as np
import time
print(vaex.__version__)

In [None]:
%%time
df = pd.read_csv('Datasets/yellow_tripdata_2020-01.csv')
df.head()

__Using Vaex__

In [None]:
%%time
df = vaex.open('Datasets/yellow_tripdata_2020-01.csv')
df.head()

In [None]:
df.head()

Explicitly set the dtype and get rid of the warning.

In [None]:
%%time
df = vaex.open('Datasets/yellow_tripdata_2020-01.csv', dtype={'store_and_fwd_flag': 'str'})
df.head()

Alternately, convert it to HDF5 format while reading. Data gets stored on disk in hdf5 format. Then import data from hdf5 will be instantaneous.

In [None]:
%%time
df = vaex.open('Datasets/yellow_tripdata_2020-01.csv', 
                convert='Datasets/yellow_tripdata_2020-01.hdf5')
df.head()

### Memory Mapping

Read from hdf5 instead. Instantly loads the file from file. Huge speed gain because nothing is copied to memory. The data is only memory mapped.

When you open a memory mapped file with Vaex, you don’t actually read the data. Vaex will swiftly reads the file metadata (like the location of the data on disk, number of rows, number of columns, column names and types), the file description. 

So, you can open these files instantly, irrespective of how much RAM you have. Done for apace arrow, hdf5 formats.

In [None]:
%%time
df = vaex.open('Datasets/yellow_tripdata_2020-01.hdf5')
df.head()

In [None]:
df.info()

Convert it to pandas df or arrow table.

In [None]:
type(df)

__To pandas dataframe__

In [None]:
df.to_pandas_df()

__To arrow table__

In [None]:
arrow_table = df.to_arrow_table()
arrow_table

__Selection and Filtering__

Instead of making copies, Vaex internally keeps track which rows are selected. Vaex has a no-copy policy.

In [None]:
df_positive_tip = df[df.tip_amount > 0]
df_positive_tip[['VendorID', 'passenger_count', 'fare_amount', 'tip_amount']]

__Another way to do filtering__


In [None]:
df.select(df.tip_amount > 0)
df.evaluate(['fare_amount', 'tip_amount'], selection=True)

Extract the numpy arrays.

In [None]:
fare, tip = df.evaluate(['fare_amount', 'tip_amount'], selection=True)
print(fare)
print(tip)

__Virtual Columns__

In [None]:
df['fare_per_unit'] = df['fare_amount'] / df['trip_distance']

In [None]:
df

Let's try and replace the infinite values.

Filtering the values to replace and assigning has problems.

In [None]:
df['fare_per_unit'].isinf()

In [None]:
df2 = df[df.fare_per_unit.isinf()]
df2

In [None]:
# Does not work! 
df2.fare_per_unit = 0

In [None]:
df2

Use custom function.

__Custom Function__

Make sure the returned object of the same length as the input.

In [None]:
# WRONG WAY -returned value should be of same length as input.
@vaex.register_function()
def make_zero(ar):
    return 0

In [None]:
# RIGHT WAY
@vaex.register_function()
def make_zero(ar):
    return np.zeros(len(ar))

In [None]:
# replace values directly, does not work
df2.func.make_zero(df.fare_per_unit)

In [None]:
df2['fare_per_unit'] = df2.func.make_zero(df.fare_per_unit)

In [None]:
df2.head()

__Apply Function__

In [None]:
# Single column apply
df['fare_log'] = df.fare_amount.apply(lambda x: np.log(x).round(2))
df

When the computation is based on multiple columns, pass the columns involved in a list to `arguments` parameter.

In [None]:
# Multi Column Apply

def tip_pct(x, y):
    return np.round(x/y, 2)

df['tip_pct'] = df.apply(tip_pct, arguments=[df.tip_amount, df.total_amount])
df

In [None]:
df = df.fillna(0, column_names=['fare_per_unit'])
df

__Do computations__

In [None]:
%%time
df['total_amount'].mean()

In [None]:
%%time
df['VendorID'].value_counts()

In [None]:
%%time
df.groupby(by='VendorID').agg({'trip_distance': 'sum', 'tip_amount': 'mean'})

You can visualize the data also with Vaex 

In [None]:
%%time
df.viz.histogram(df['total_amount']);

For more documentation and tutorials refer to https://vaex.io/docs/index.html

<div class="alert alert-info" style="background-color:#006666; color:white; padding:0px 10px; border-radius:5px;"><h2 style='margin:10px 5px'>6. Ray</h2>
</div>

Ray is a distributed execution framework that makes it easy to scale your applications and to leverage state of the art machine learning libraries.

Ray provides a simple, universal API for building distributed applications.

Ray accomplishes this mission by:
- Providing simple primitives for building and running distributed applications.
- Enabling end users to parallelize single machine code, with little to zero code changes.
- Including a large ecosystem of applications, libraries, and tools on top of the core Ray to enable complex applications.

Ray works very well with python and java. It's getting integreting with C++ as well (still in experimental stage)

In [None]:
import ray
import pandas as pd
import time
import numpy as np

__Task__

Get the list of prime numbers from 0 to 100.

__Without ray__

In [None]:
def check_prime(num):
    flag = False
    if num > 1:
        # check for factors
        for i in range(2, num):
            if (num % i) == 0:
                # if factor is found, set flag to True
                flag = True
                # break out of loop
                break
        if flag == False:
            return num              
        else:
            return np.nan

In [None]:
# Time to execute
start = time.time()

prime_numbers = []
for num in range(2,100000):
    prime_numbers.append(check_prime(num))
output = [num for num in prime_numbers if str(num) != 'nan']

end = time.time()
print("Elapsed (with compilation) = %s" % (end - start))

__With Ray__

In [None]:
# Start Ray.
ray.init()

@ray.remote
def check_prime(num):
    flag = False
    if num > 1:
        # check for factors
        for i in range(2, num):
            if (num % i) == 0:
                # if factor is found, set flag to True
                flag = True
                # break out of loop
                break
        if flag == False:
            return num              
        else:
            return np.nan

In [None]:
# Time to execute
start = time.time()

prime_numbers = []
for num in range(2,100000):
    prime_numbers.append(check_prime.remote(num))
output = ray.get(prime_numbers)
output = [num for num in output if str(num) != 'nan']

end = time.time()
print("Elapsed (after compilation) = %s" % (end - start))

In [None]:
ray.shutdown()

<div class="alert alert-info" style="background-color:#006666; color:white; padding:0px 10px; border-radius:5px;"><h2 style='margin:10px 5px'>7. Cython</h2>
</div>

__When to use Cython__

Once you have written your code in python and you want to make it run as fast as possible, you might want to use Cython. 

Python is a dynamically typed programming language. 

That is, to create a variable say `x=5` you simply state that. You don't have to tell Python what is the type of data `x` is going to hold. Underneath the hood, Python does the job of figuring out the `type` for you.

Like wise, you may change `x="some text"` at a later point. Your code will still work. Thanks to Python. This is true for all the variables that you create, including the loop counters when you write a for-loop.

However, this comes at a cost of speed. This may often go unnoticed initially. But as you scale and expand your code, like making a website or building a software or ML application, it will start to matter.

This is where Cython comes to help.

By making small changes to your code, like adding the type definitions of the variables, you can use Cython to compile your python code (Cythonize) and it starts to run faster.

We will see how to Cythonize regular python code and then see how to cythonize Python code that use Numpy arrays. Since Pandas dataframes are essentially built on top of numpy arrays, its possible to cythonize Pandas code as well.

__What you will need__

To use Cython you will need a C compiler in your system, like `gcc`, which you probably will have already.

In [None]:
# Install Cython
# !pip install cython

In [None]:
import cython  
print(cython.__version__)

0.29.23


In [None]:
%load_ext Cython

__Task__

Define a variable.

In [None]:
# python 
a = 5
print(a)

5


In [None]:
%%cython
cdef int a = 10                   # integer
cdef double b                     # float
cdef bint c = False               # boolean

cdef str d = "some text"          # text
cdef list e = [1,2,3,4]           # list
cdef dict f = {"a": 10}           # list


print(a, type(a))
print(b, type(b))
print(c, type(c))
print(d, type(d))
print(e, type(e))
print(f, type(f))

10 <class 'int'>
0.0 <class 'float'>
False <class 'bool'>
some text <class 'str'>
[1, 2, 3, 4] <class 'list'>
{'a': 10} <class 'dict'>


__Task__

Calculate factorial of 5000

__Without cython__

In [None]:
def factorial(x):
    y = 1
    for i in range(1, x+1):
        y *= i
    return y

In [None]:
# Time to execute
import time
start = time.time()

factorial(5000)

end = time.time()
print("Elapsed (with compilation) = %s" % (end - start))

Elapsed (with compilation) = 0.009999990463256836


In [None]:
# Time to execute
start = time.time()

factorial(50_000)

end = time.time()
print("Elapsed (with compilation) = %s" % (end - start))

Elapsed (with compilation) = 0.7819967269897461


In [None]:
# Time to execute: 500_000
start = time.time()

factorial(500_000)

end = time.time()
print("Elapsed (with compilation) = %s" % (end - start))

Elapsed (with compilation) = 114.55501103401184


__With cython__

In [None]:
%%cython
cpdef int c_factorial(int x):
    cdef int y = 1
    cdef int i
    for i in range(1, x+1):
        y *= i
    return y

In [None]:
# Time to execute
start = time.time()

c_factorial(50000)

end = time.time()
print("Elapsed (with compilation) = %s" % (end - start))

Elapsed (with compilation) = 0.0


No time taken.

In [None]:
# Time to execute: 500_000
start = time.time()

c_factorial(500_000)

end = time.time()
print("Elapsed (with compilation) = %s" % (end - start))

Elapsed (with compilation) = 0.002002716064453125


Still No time taken.

In [None]:
# Time to execute: Add three zeros - 50_000_000
start = time.time()

c_factorial(50_000_000)

end = time.time()
print("Elapsed (with compilation) = %s" % (end - start))

Elapsed (with compilation) = 0.04900550842285156


Now it shows some time. Whereas, doing this in plain Python took us forever (well, 2 solid mins).

## __Cythonize Pandas Code__

Let's define a function that does a computation on a pandas dataframe. And try to optimize the time taken using Cython.

In [None]:
import pandas as pd
df = pd.read_csv("Datasets/yellow_tripdata_2020-01.csv", 
                 dtype={'store_and_fwd_flat': 'str'}, 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
                 low_memory=False)

df = df.head(100000)  # 100k rows
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1.0,2020-01-01 00:28:15,2020-01-01 00:33:03,1.0,1.2,1.0,N,238,239,1.0,6.0,3.0,0.5,1.47,0.0,0.3,11.27,2.5
1,1.0,2020-01-01 00:35:39,2020-01-01 00:43:04,1.0,1.2,1.0,N,239,238,1.0,7.0,3.0,0.5,1.5,0.0,0.3,12.3,2.5
2,1.0,2020-01-01 00:47:41,2020-01-01 00:53:52,1.0,0.6,1.0,N,238,238,1.0,6.0,3.0,0.5,1.0,0.0,0.3,10.8,2.5
3,1.0,2020-01-01 00:55:23,2020-01-01 01:00:14,1.0,0.8,1.0,N,238,151,1.0,5.5,0.5,0.5,1.36,0.0,0.3,8.16,0.0
4,2.0,2020-01-01 00:01:58,2020-01-01 00:04:16,1.0,0.0,1.0,N,193,193,2.0,3.5,0.5,0.5,0.0,0.0,0.3,4.8,0.0


In [None]:
df['trip_distance'] = df['trip_distance'].map(lambda x: round(x*10) if x > 0 else round(1.0))
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1.0,2020-01-01 00:28:15,2020-01-01 00:33:03,1.0,12,1.0,N,238,239,1.0,6.0,3.0,0.5,1.47,0.0,0.3,11.27,2.5
1,1.0,2020-01-01 00:35:39,2020-01-01 00:43:04,1.0,12,1.0,N,239,238,1.0,7.0,3.0,0.5,1.5,0.0,0.3,12.3,2.5
2,1.0,2020-01-01 00:47:41,2020-01-01 00:53:52,1.0,6,1.0,N,238,238,1.0,6.0,3.0,0.5,1.0,0.0,0.3,10.8,2.5
3,1.0,2020-01-01 00:55:23,2020-01-01 01:00:14,1.0,8,1.0,N,238,151,1.0,5.5,0.5,0.5,1.36,0.0,0.3,8.16,0.0
4,2.0,2020-01-01 00:01:58,2020-01-01 00:04:16,1.0,1,1.0,N,193,193,2.0,3.5,0.5,0.5,0.0,0.0,0.3,4.8,0.0


__Plain Python__

Function to determine driver commission as a function of the fare, commission rate and trip distance.

In [None]:
def formula(factor, fare):
    return factor * fare

def driver_commission(rate=0.2, fare_amount=None, trip_distance=None):
    commission = rate*fare_amount
    for i in range(1, (trip_distance+1)):
        factor = .001
        commission += formula(factor, fare_amount)
    return round(commission,2)

# Call and check
print(driver_commission(.2, 10, 12))
print(driver_commission(.3, 10, 12))
print(driver_commission(.4, 10, 12))
print(driver_commission(.4, 10, 20))
print(driver_commission(.4, 10, 30))
print(driver_commission(.4, 10, 40))
print(driver_commission(.4, 10, 50))

2.12
3.12
4.12
4.2
4.3
4.4
4.5


Check time of `driver_commission` for one instance.

In [None]:
%%timeit -n 1
driver_commission(.2, 10, 12)

6.76 µs ± 3.01 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)


Time taken for the entire dataframe

In [None]:
%%timeit -r 1
df['driver_commission'] = df.apply(lambda x: driver_commission(rate=.2, 
                                             fare_amount=x["fare_amount"], 
                                             trip_distance=x["trip_distance"]), 
                                   axis=1)

4.02 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


__Simple Cythonize__

In [None]:
%%cython
def formula(factor, fare):
    return factor * fare

def driver_commission_cy(rate=0.2, fare_amount=None, trip_distance=None):
    commission = rate*fare_amount
    for i in range(1, (trip_distance+1)):
        factor = .001
        commission += formula(factor, fare_amount)
    return round(commission,2)

Check time take by `driver_commission` for one instance.

In [None]:
%%timeit
driver_commission_cy(.2, 10, 12)

2.67 µs ± 72 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


Time taken on the entire dataframe

In [None]:
%%timeit
df['driver_commission_cy'] = df.apply(lambda x: driver_commission_cy(rate=.2, 
                                                fare_amount=x["fare_amount"], 
                                                trip_distance=x["trip_distance"]), 
                                       axis=1)

3.3 s ± 88 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
df.head()

__Type definitions__

In [None]:
%%cython
cimport numpy as np
import numpy as np

cdef double formula(double factor, double fare):
    return factor * fare

cpdef double driver_commission_typedef(double rate, double fare_amount, int trip_distance):
    cdef double commission = rate * fare_amount
    cdef int i
    cdef double factor = 0.001
    for i in range(1, (trip_distance+1)):
        commission += formula(factor, fare_amount)
    return round(commission,2)

In [None]:
%%timeit
driver_commission_typedef(.2, 10, 12)

1.07 µs ± 99.8 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)


In [None]:
%%timeit -r 1
df['driver_commission_typedef'] = df.apply(lambda x: driver_commission_typedef(rate=.2, 
                                                         fare_amount=x["fare_amount"], 
                                                         trip_distance=x["trip_distance"]), 
                                           axis=1)

2.97 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [None]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,...,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,driver_commission,driver_commission_cy,driver_commission_typedef
0,1.0,2020-01-01 00:28:15,2020-01-01 00:33:03,1.0,12,1.0,N,238,239,1.0,...,3.0,0.5,1.47,0.0,0.3,11.27,2.5,1.27,1.27,1.27
1,1.0,2020-01-01 00:35:39,2020-01-01 00:43:04,1.0,12,1.0,N,239,238,1.0,...,3.0,0.5,1.5,0.0,0.3,12.3,2.5,1.48,1.48,1.48
2,1.0,2020-01-01 00:47:41,2020-01-01 00:53:52,1.0,6,1.0,N,238,238,1.0,...,3.0,0.5,1.0,0.0,0.3,10.8,2.5,1.24,1.24,1.24
3,1.0,2020-01-01 00:55:23,2020-01-01 01:00:14,1.0,8,1.0,N,238,151,1.0,...,0.5,0.5,1.36,0.0,0.3,8.16,0.0,1.14,1.14,1.14
4,2.0,2020-01-01 00:01:58,2020-01-01 00:04:16,1.0,1,1.0,N,193,193,2.0,...,0.5,0.5,0.0,0.0,0.3,4.8,0.0,0.7,0.7,0.7


## __Cythonize the apply function__

Note: The cython cell must have all the imports and functions used within the cell itself.

In [None]:
%%cython
cimport cython
cimport numpy as np
import numpy as np

cdef double formula(double factor, double fare):
    return factor * fare


cpdef double driver_commission_typedef(double rate, double fare_amount, long trip_distance):
    cdef double commission = rate*fare_amount
    cdef int i
    cdef double factor = 0.001
    for i in range(1, (trip_distance+1)):
        commission += formula(factor, fare_amount)
    return round(commission,2)


@cython.boundscheck(False)
@cython.wraparound(False)
cpdef np.ndarray apply_driver_commission(np.ndarray col_fare_amount, 
                                                 np.ndarray col_trip_distance):
    cdef int i, n = len(col_trip_distance)
    cdef np.ndarray[double] res = np.zeros(n)
    assert len(col_fare_amount) == len(col_trip_distance) == n
    for i in range(n):
        res[i] = driver_commission_typedef(rate=0.2, 
                                           fare_amount=col_fare_amount[i],
                                           trip_distance=col_trip_distance[i])
    return res

In [None]:
%%timeit
df['driver_commission_capply'] = apply_driver_commission(df['fare_amount'].to_numpy(), df['trip_distance'].to_numpy())

119 ms ± 2.74 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [None]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,...,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,driver_commission,driver_commission_cy,driver_commission_typedef,driver_commission_capply
0,1.0,2020-01-01 00:28:15,2020-01-01 00:33:03,1.0,12,1.0,N,238,239,1.0,...,0.5,1.47,0.0,0.3,11.27,2.5,1.27,1.27,1.27,1.27
1,1.0,2020-01-01 00:35:39,2020-01-01 00:43:04,1.0,12,1.0,N,239,238,1.0,...,0.5,1.5,0.0,0.3,12.3,2.5,1.48,1.48,1.48,1.48
2,1.0,2020-01-01 00:47:41,2020-01-01 00:53:52,1.0,6,1.0,N,238,238,1.0,...,0.5,1.0,0.0,0.3,10.8,2.5,1.24,1.24,1.24,1.24
3,1.0,2020-01-01 00:55:23,2020-01-01 01:00:14,1.0,8,1.0,N,238,151,1.0,...,0.5,1.36,0.0,0.3,8.16,0.0,1.14,1.14,1.14,1.14
4,2.0,2020-01-01 00:01:58,2020-01-01 00:04:16,1.0,1,1.0,N,193,193,2.0,...,0.5,0.0,0.0,0.3,4.8,0.0,0.7,0.7,0.7,0.7


__Import the whole dataset again and apply again__

In [None]:
df = pd.read_csv("Datasets/yellow_tripdata_2020-01.csv", 
                 dtype={'store_and_fwd_flat': 'str'}, 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
                 low_memory=False)

In [None]:
df['trip_distance'] = df['trip_distance'].map(lambda x: round(x*10) if x > 0 else round(1.0))

In [None]:
df.shape

(6405008, 18)

In [None]:
%%timeit -r 1
df['driver_commission_capply'] = apply_driver_commission(df['fare_amount'].to_numpy(), df['trip_distance'].to_numpy())

7.19 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [None]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,driver_commission_capply
0,1.0,2020-01-01 00:28:15,2020-01-01 00:33:03,1.0,12,1.0,N,238,239,1.0,6.0,3.0,0.5,1.47,0.0,0.3,11.27,2.5,1.27
1,1.0,2020-01-01 00:35:39,2020-01-01 00:43:04,1.0,12,1.0,N,239,238,1.0,7.0,3.0,0.5,1.5,0.0,0.3,12.3,2.5,1.48
2,1.0,2020-01-01 00:47:41,2020-01-01 00:53:52,1.0,6,1.0,N,238,238,1.0,6.0,3.0,0.5,1.0,0.0,0.3,10.8,2.5,1.24
3,1.0,2020-01-01 00:55:23,2020-01-01 01:00:14,1.0,8,1.0,N,238,151,1.0,5.5,0.5,0.5,1.36,0.0,0.3,8.16,0.0,1.14
4,2.0,2020-01-01 00:01:58,2020-01-01 00:04:16,1.0,1,1.0,N,193,193,2.0,3.5,0.5,0.5,0.0,0.0,0.3,4.8,0.0,0.7
