### Lets load the divvy bike data set using regular pandas. 

In [1]:
import pandas as pd
import time
def load_data(filename):
    start=time.time()
    df=pd.read_csv(filename)
    print(f"time to run query {time.time()-start}")
    return df
df=load_data("data/202004-divvy-tripdata.csv")

time to run query 0.21261382102966309


### Lets take a quick look at the resulting dataframe

In [4]:
df

Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
0,A847FADBBC638E45,docked_bike,2020-04-26 17:45:14,2020-04-26 18:12:03,Eckhart Park,86,Lincoln Ave & Diversey Pkwy,152.0,41.8964,-87.6610,41.9322,-87.6586,member
1,5405B80E996FF60D,docked_bike,2020-04-17 17:08:54,2020-04-17 17:17:03,Drake Ave & Fullerton Ave,503,Kosciuszko Park,499.0,41.9244,-87.7154,41.9306,-87.7238,member
2,5DD24A79A4E006F4,docked_bike,2020-04-01 17:54:13,2020-04-01 18:08:36,McClurg Ct & Erie St,142,Indiana Ave & Roosevelt Rd,255.0,41.8945,-87.6179,41.8679,-87.6230,member
3,2A59BBDF5CDBA725,docked_bike,2020-04-07 12:50:19,2020-04-07 13:02:31,California Ave & Division St,216,Wood St & Augusta Blvd,657.0,41.9030,-87.6975,41.8992,-87.6722,member
4,27AD306C119C6158,docked_bike,2020-04-18 10:22:59,2020-04-18 11:15:54,Rush St & Hubbard St,125,Sheridan Rd & Lawrence Ave,323.0,41.8902,-87.6262,41.9695,-87.6547,casual
...,...,...,...,...,...,...,...,...,...,...,...,...,...
84771,200E9CDFC5685AA0,docked_bike,2020-04-16 16:10:16,2020-04-16 16:23:11,Dearborn Pkwy & Delaware Pl,140,Dearborn Pkwy & Delaware Pl,140.0,41.8990,-87.6299,41.8990,-87.6299,member
84772,F58A8F2ABCB5D95B,docked_bike,2020-04-30 17:56:12,2020-04-30 18:15:21,Kimbark Ave & 53rd St,322,Cottage Grove Ave & 51st St,351.0,41.7996,-87.5947,41.8030,-87.6066,casual
84773,A3754693A80E4913,docked_bike,2020-04-24 19:57:33,2020-04-24 21:50:43,Sedgwick St & Schiller St,236,Wells St & Elm St,182.0,41.9076,-87.6386,41.9032,-87.6343,casual
84774,D610CABB67F7B744,docked_bike,2020-04-02 17:59:55,2020-04-02 18:42:26,Damen Ave & Charleston St,310,Damen Ave & Charleston St,310.0,41.9201,-87.6779,41.9201,-87.6779,casual


# Now lets bodoize this code, simply add
 import bodo
 <br>@bodo.jit decorator to the function that needs to be converted.
 <br>Bodo will automatically compile the code and optimize it. Additionally, it will leverage MPI and make the single threaded pandas code distributable and parallel.
 <br>Lets do a conversion below.

In [2]:
import pandas as pd
import time
import bodo
@bodo.jit(cache=True) 
def load_data_bodo(filename):
    start=time.time()
    df=pd.read_csv(filename)
    print(f"time to run query {time.time()-start}")
    return df
df=load_data_bodo("data/202004-divvy-tripdata.csv")

time to run query 0.429638


# Hmm, why is this taking time to start
  A small upfront premium for lifelong returns.
 <br>The compilation done by bodo can take sometime based on the complexity of the code, the data schema and the number of columns.
 <br>But this is a one time cost, notice the <b>cache=True</b>, this tells bodo to cache the ouput of the compilation. As long as the function (code) and the data schema (actual data could be different)
    <br>don't change, bodo will not recompile and you will see that the function with run instantly.


# let's run the below cell and see what happens when we call the bodo function again

In [3]:
df=load_data_bodo("data/202004-divvy-tripdata.csv")

time to run query 0.265491


# lets quickly inspect the dataframe

In [4]:
df

Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
0,A847FADBBC638E45,docked_bike,2020-04-26 17:45:14,2020-04-26 18:12:03,Eckhart Park,86,Lincoln Ave & Diversey Pkwy,152,41.8964,-87.6610,41.9322,-87.6586,member
1,5405B80E996FF60D,docked_bike,2020-04-17 17:08:54,2020-04-17 17:17:03,Drake Ave & Fullerton Ave,503,Kosciuszko Park,499,41.9244,-87.7154,41.9306,-87.7238,member
2,5DD24A79A4E006F4,docked_bike,2020-04-01 17:54:13,2020-04-01 18:08:36,McClurg Ct & Erie St,142,Indiana Ave & Roosevelt Rd,255,41.8945,-87.6179,41.8679,-87.6230,member
3,2A59BBDF5CDBA725,docked_bike,2020-04-07 12:50:19,2020-04-07 13:02:31,California Ave & Division St,216,Wood St & Augusta Blvd,657,41.9030,-87.6975,41.8992,-87.6722,member
4,27AD306C119C6158,docked_bike,2020-04-18 10:22:59,2020-04-18 11:15:54,Rush St & Hubbard St,125,Sheridan Rd & Lawrence Ave,323,41.8902,-87.6262,41.9695,-87.6547,casual
...,...,...,...,...,...,...,...,...,...,...,...,...,...
84771,200E9CDFC5685AA0,docked_bike,2020-04-16 16:10:16,2020-04-16 16:23:11,Dearborn Pkwy & Delaware Pl,140,Dearborn Pkwy & Delaware Pl,140,41.8990,-87.6299,41.8990,-87.6299,member
84772,F58A8F2ABCB5D95B,docked_bike,2020-04-30 17:56:12,2020-04-30 18:15:21,Kimbark Ave & 53rd St,322,Cottage Grove Ave & 51st St,351,41.7996,-87.5947,41.8030,-87.6066,casual
84773,A3754693A80E4913,docked_bike,2020-04-24 19:57:33,2020-04-24 21:50:43,Sedgwick St & Schiller St,236,Wells St & Elm St,182,41.9076,-87.6386,41.9032,-87.6343,casual
84774,D610CABB67F7B744,docked_bike,2020-04-02 17:59:55,2020-04-02 18:42:26,Damen Ave & Charleston St,310,Damen Ave & Charleston St,310,41.9201,-87.6779,41.9201,-87.6779,casual


# We saw how simple it was to take a sample pandas code and bodoize it.
Now,Lets run the bodoized code on a cluster.
<br> We will leverage IpyParallel to create a 2 core cluster.
<br> Run the below example to create a 2 core cluster. Bodo community version allows you to scale upto 8- cores without a need for a license
<br> With an enterprise license, Bodo can scale to 1000s of cores.

In [5]:
import ipyparallel as ipp
import psutil; 

n = min(psutil.cpu_count(logical=False), 2)
rc = ipp.Cluster(engines='mpi', n=n).start_and_connect_sync(activate=True)

Starting 2 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:18<00:00,  9.28s/engine]


# Now we have a cluster with 2 cores, you can set n to any other value (based on number of cores available)
To execute bodo code and python code against this cluster, simply use the %%px magic
<br>Bodo can easily scale up to 1000s of cores, the community license allows you go upto 8 cores.

In [7]:
%%px
import pandas as pd
import time
import bodo
@bodo.jit(cache=True)
def load_data_bodo(filename):
    start=time.time()
    df=pd.read_csv(filename)
    print(f"time to run query {time.time()-start}")
    return df
df=load_data_bodo("data/202004-divvy-tripdata.csv")

[stdout:0] time to run query 0.159680


# Great, we have run a parallel and distributed bodo code on a cluster with 2 Ranks.
Each that is part of a bodo (mpi) cluster is called a rank. Bodo ensures that code and data are evenly distributed across these ranks for a faster scalable and distributed data processing.
<br> notice how bodo has reduce the runtime by half.
<br> There may not be much gain, that is probably because there is not much data in this one file. What if we passed the whole "data/" folder. 
<br>Lets quickly inspect data in the ranks, before we do that

In [7]:
%%px --targets 0
df

Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
0,A847FADBBC638E45,docked_bike,2020-04-26 17:45:14,2020-04-26 18:12:03,Eckhart Park,86,Lincoln Ave & Diversey Pkwy,152,41.8964,-87.6610,41.9322,-87.6586,member
1,5405B80E996FF60D,docked_bike,2020-04-17 17:08:54,2020-04-17 17:17:03,Drake Ave & Fullerton Ave,503,Kosciuszko Park,499,41.9244,-87.7154,41.9306,-87.7238,member
2,5DD24A79A4E006F4,docked_bike,2020-04-01 17:54:13,2020-04-01 18:08:36,McClurg Ct & Erie St,142,Indiana Ave & Roosevelt Rd,255,41.8945,-87.6179,41.8679,-87.6230,member
3,2A59BBDF5CDBA725,docked_bike,2020-04-07 12:50:19,2020-04-07 13:02:31,California Ave & Division St,216,Wood St & Augusta Blvd,657,41.9030,-87.6975,41.8992,-87.6722,member
4,27AD306C119C6158,docked_bike,2020-04-18 10:22:59,2020-04-18 11:15:54,Rush St & Hubbard St,125,Sheridan Rd & Lawrence Ave,323,41.8902,-87.6262,41.9695,-87.6547,casual
...,...,...,...,...,...,...,...,...,...,...,...,...,...
42383,BA59EF067C2B3C7D,docked_bike,2020-04-04 15:39:57,2020-04-04 15:49:49,Bissell St & Armitage Ave,113,Lincoln Ave & Belmont Ave,131,41.9184,-87.6522,41.9394,-87.6684,member
42384,9883C40CD32C41E5,docked_bike,2020-04-30 19:32:38,2020-04-30 20:05:15,Bissell St & Armitage Ave,113,Noble St & Milwaukee Ave,29,41.9184,-87.6522,41.9007,-87.6626,member
42385,D3D9AEF82AFD6ACB,docked_bike,2020-04-20 17:10:55,2020-04-20 17:33:55,Aberdeen St & Monroe St,80,Wells St & Concord Ln,289,41.8804,-87.6555,41.9121,-87.6347,member
42386,2B96FA1E906AB83B,docked_bike,2020-04-12 14:04:44,2020-04-12 14:50:38,Wood St & Milwaukee Ave,61,Honore St & Division St,17,41.9077,-87.6726,41.9031,-87.6739,casual


In [8]:
%%px --targets 1
df

Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
42388,BD2F53183F2FC7C8,docked_bike,2020-04-09 16:36:51,2020-04-09 16:50:33,Lincoln Ave & Roscoe St,230,Sheridan Rd & Buena Ave,306,41.9433,-87.6710,41.9585,-87.6550,member
42389,B4FBC5AF5B031A30,docked_bike,2020-04-11 12:45:01,2020-04-11 13:15:35,Sheffield Ave & Willow St,93,Clark St & Winnemac Ave,325,41.9137,-87.6529,41.9733,-87.6678,member
42390,1C279DD521E3B2D5,docked_bike,2020-04-11 11:50:48,2020-04-11 12:21:05,Damen Ave & Foster Ave,464,Sheffield Ave & Willow St,93,41.9756,-87.6795,41.9137,-87.6529,member
42391,D175770C9B75EBFE,docked_bike,2020-04-11 15:55:58,2020-04-11 16:06:08,Southport Ave & Clybourn Ave,307,Walsh Park,628,41.9208,-87.6637,41.9146,-87.6680,member
42392,CEC7DF66A37DC385,docked_bike,2020-04-18 16:38:17,2020-04-18 17:56:49,Dearborn St & Erie St,110,Wells St & Hubbard St,212,41.8940,-87.6293,41.8899,-87.6343,member
...,...,...,...,...,...,...,...,...,...,...,...,...,...
84771,200E9CDFC5685AA0,docked_bike,2020-04-16 16:10:16,2020-04-16 16:23:11,Dearborn Pkwy & Delaware Pl,140,Dearborn Pkwy & Delaware Pl,140,41.8990,-87.6299,41.8990,-87.6299,member
84772,F58A8F2ABCB5D95B,docked_bike,2020-04-30 17:56:12,2020-04-30 18:15:21,Kimbark Ave & 53rd St,322,Cottage Grove Ave & 51st St,351,41.7996,-87.5947,41.8030,-87.6066,casual
84773,A3754693A80E4913,docked_bike,2020-04-24 19:57:33,2020-04-24 21:50:43,Sedgwick St & Schiller St,236,Wells St & Elm St,182,41.9076,-87.6386,41.9032,-87.6343,casual
84774,D610CABB67F7B744,docked_bike,2020-04-02 17:59:55,2020-04-02 18:42:26,Damen Ave & Charleston St,310,Damen Ave & Charleston St,310,41.9201,-87.6779,41.9201,-87.6779,casual


# Lets run this on the entire data folder. with bodo, you can just point bodo to the location where the data is and bodo auto shard the data across available ranks.

In [20]:
%%px
import pandas as pd
import time
import bodo
@bodo.jit(cache=True)
def load_data_bodo(filename):
    start=time.time()
    df=pd.read_csv(filename)
    print(f"time to run query {time.time()-start}")
    return df
df=load_data_bodo("data/")

%px:   0%|                                                                                                                                            | 0/4 [00:05<?, ?tasks/s]

[stdout:0] time to run query 4.700581


%px: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:05<00:00,  1.39s/tasks]


#### Notice how the divvy csv data has been distributed across the ranks.
The bodo function has taken a single threaded pandas function and converted into distributed codes, which produced a distributed dataframe.
Unlike databricks or spark, bodo dataframes always stay distributed, even when converted back to python, Lets see a quick example.

In [9]:
%%px
import time
def groupbymember_casual(column_name,df):
    start=time.time()
    gdf=df.groupby(column_name,as_index=False)["ride_id"].count()
    print(f"time for group by bodo {time.time()-start}")
    return gdf
gdf=groupbymember_casual(["member_casual"],df)

[stdout:1] time for group by bodo 0.007768154144287109


[stdout:0] time for group by bodo 0.008436918258666992


In [10]:
%%px
gdf

Unnamed: 0,member_casual,ride_id
0,casual,12339
1,member,30049


Unnamed: 0,member_casual,ride_id
0,casual,11289
1,member,31099


Notice, how the dataframe was grouped and counted for rides, based on the member_casual value.
<br>The above function though was not jitted, so it ran as a regular pandas function.
<br>Though did this return the required output, it was not the right answer to this problem. 
<br> pandas was only able to give you the counts for chunk on each of the rank. It did not have the capability to treat this chunked data set spread across multiple cores as a single dataframe <br>and calculate the answer.
<br>How do we do this calculation across the data present in all the ranks? simple, add the @bodo.jit decorator to the function.

In [11]:
%%px
import time
@bodo.jit(cache=True)
def groupbymember_casual(column_name,df):
    start=time.time()
    gdf=df.groupby(column_name,as_index=False)["ride_id"].count()
    print(f"time for group by bodo {time.time()-start}")
    return gdf
gdf=groupbymember_casual(["member_casual"],df)

%px:   0%|                                                                                                            | 0/2 [00:00<?, ?tasks/s]

[stdout:0] time for group by bodo 0.010328


%px: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:00<00:00, 65.06tasks/s]


In [12]:
%%px
gdf

Unnamed: 0,member_casual,ride_id


Unnamed: 0,member_casual,ride_id
0,member,61148
1,casual,23628


Notice how simply adding the decorator, made this simple function distributed and data was shuffled across the ranks to provide an answer to this question across the entire dataset across all the ranks.
<br> This same concept can be extended to multiple cores and to other use cases like pivoting, joins etc.

For those not familiar with pandas, we have an alternative, BodoSQL. BodoSQL takes regular sql and convert it to superfast code  that can be scalled easily across 1000s of cores. Example below

In [13]:
%%px
import os
import time
#os.environ["CONDA_PREFIX"]="/opt/conda/"
import bodosql
@bodo.jit(cache=True)
def groupbymember_casual_sql(column_names,df):
    start=time.time()
    bc = bodosql.BodoSQLContext({"divvy": df})
    gdf=bc.sql(f"""select {column_names},count(ride_id) from divvy group by {column_names}""")
    with bodo.objmode:
        time.sleep(5)
    print(f"time for group by sql {time.time()-start}")
    return gdf
groupbymember_casual_sql("member_casual,rideable_type",df)

%px:   0%|                                                                                                            | 0/2 [00:08<?, ?tasks/s]

[stdout:0] time for group by sql 5.084388


Unnamed: 0,member_casual,rideable_type,EXPR$2
1,casual,docked_bike,23628


Unnamed: 0,member_casual,rideable_type,EXPR$2
0,member,docked_bike,61148


%px: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:08<00:00,  4.41s/tasks]


Hopefully the exercise allowed you to see how simple and powerful bodo can be. Bodo is alos versatile and offers multiple developments options like python/pandas and sql.
Run the code cell below to stop the 2 rank cluster we created.

In [17]:
rc.cluster.stop_cluster_sync()

Stopping controller
Stopping engine(s): 1647970024


# Bodo can work with any cloud provider and any data store, s3,ADLS,GCS,HDFS and so on. Bodo can also works with shared file systems as well. 
<br> Please reach out to us contact@bodo.ai or at https://www.bodo.ai/contact.