In [None]:
# BASED ON: https://medium.com/intel-analytics-software/distributed-xgboost-with-modin-on-ray-fc17edef7720

DATASET:
    - HIGGS.csv
    - 8GB
    
    
PLATFORM:
    - M1 - Both MODIN and RAY/PANDAS worked; SharedRayDMatrix with (parquet) NOT TESTED
    - AWS SM ml.m3.medium - NONE worked - OOM Errors
        - NEXT STEP: find minium instance that allows execution
    
FINDINGS:
    - MODIN with MODIN.DISTRIBUTED datasets offers by far the best performance (measued in miliseconds vs. seconds)
    - Both MODIN and RAY/PANDAS do not speed up with increase in "num_actors" parameter
    - RAY with PANDAS data does take some advantege of multiple "cpus_per_actor" parameter when measured with Wall Time
    - SharedRayDMatrix with (parquet) dies with OOM Error on 4GB Mem Instance - due to the load of the whole input data into memoryby XGBoost
        - NEXT STEP - test with PIPE Model on AWS Implementation - See AWS notebook
        - NEXT STEP - Test with Modin and Parquet ... in progress

In [None]:
%%time
# TEST 1 - GET DATA
# - XGBOOST on HIGGS dataset
# - Pure Pandas (CSV) vs. Modin (CSV) vs. SharedRayDMatrix with (parquet)

engine = 1 # 0 Pandas, 1 Modin, 2 Sharded RayDMatrix

if engine == 0:
    print("engine: Pandas")
    import pandas as pd
    from xgboost_ray import RayDMatrix, RayParams, train
    df = pd.read_csv("HIGGS.csv")
    dmatrix = RayDMatrix(df.iloc[:,:-1], df.iloc[:,-1])
elif engine == 1:
    print("engine: Modin")
    import modin.pandas as pd
    import modin.experimental.xgboost as xgb
    import ray
    ray.init(ignore_reinit_error=True)
    print(1)
    df = pd.read_csv("HIGGS.csv")
    print(2)
    dmatrix = xgb.DMatrix(df.iloc[:,:-1], df.iloc[:,-1])
    print(3)
elif engine == 2:
    print("engine: SharedRayDMatrix with (parquet)")    
    import glob
    from xgboost_ray import RayDMatrix, RayFileType, RayParams, train

    # list of files to pass to estimator
    path = list(sorted(glob.glob("HIGGS_DATA/*.parquet")))
    print("PATH", path)

    # OPTIONAL: Specify colums in .parqut files to load to the estimator`- in this test they were removed
    """
    columns = ["passenger_count",
        "trip_distance", "pickup_longitude", "pickup_latitude",
        "dropoff_longitude", "dropoff_latitude",
        "fare_amount", "extra", "mta_tax", "tip_amount",
        "tolls_amount", "total_amount"]
    """
    
    dmatrix = RayDMatrix(path,
                         label = "28", #"passenger_count",  # Will select this column as the label
                         #columns=columns,
                         # ignore=["total_amount"],  # Optional list of columns to ignore
                         filetype=RayFileType.PARQUET)

In [None]:
%%time
# TEST 1 - RUN MODEL

if engine == 0: 
    model = train({}, dmatrix, ray_params=RayParams(num_actors=1, cpus_per_actor=10))
elif engine == 1:
    model = xgb.train({}, dmatrix, num_actors=1)
elif engine == 2:
    model = train({}, dmatrix, ray_params=RayParams(num_actors=1, cpus_per_actor=2)) 

In [None]:
# WITH PANDAS - M1
num_actors=10, cpus_per_actor=1
in 190.63 seconds (106.33 pure XGBoost training time).
CPU times: user 5.96 s, sys: 2min 39s, total: 2min 45s
Wall time: 3min 10s

num_actors=1, cpus_per_actor=1
20.36 seconds (9.84 pure XGBoost training time).
CPU times: user 411 ms, sys: 16.2 s, total: 16.6 s
Wall time: 20.4 s

num_actors=1, cpus_per_actor=10    
in 15.64 seconds (9.45 pure XGBoost training time).
CPU times: user 467 ms, sys: 15.5 s, total: 16 s
Wall time: 15.9 s
    
# WITH MODIN - M1
num_actors=10
2nd RUN!!! (without relaod of ray and modin)
CPU times: user 63.1 ms, sys: 42 ms, total: 105 ms
Wall time: 8.69 s
    
1st RUN - CPU times around 120ms!!!
    
num_actors=1    
1st RUN
CPU times: user 149 ms, sys: 43.3 ms, total: 192 ms
Wall time: 16.8 s

2nd RUN!!! (without relaod of ray and modin)    
CPU times: user 32 ms, sys: 25.1 ms, total: 57.1 ms
Wall time: 10.6 s    
    
    
num_actors=10 with ray.init()
1st RUN
CPU times: user 122 ms, sys: 51.4 ms, total: 173 ms
Wall time: 9.46 s
    
2nd RUN!!!! (without relaod of ray and modin)
CPU times: user 65.3 ms, sys: 38.5 ms, total: 104 ms
Wall time: 8.22 s

# SharedRayDMatrix with (parquet) - AWS ml.m3.medium

- OOM Error 

In [None]:
# Fix OOM with Modin -- Partially Successfull!

BASED ON: https://modin.readthedocs.io/en/stable/getting_started/why_modin/out_of_core.html

- Presumably Modin should allow work with datasets that do not fit into memory.
- While it delivered better performance than pure pandas, we still run into OOM errors
- USE ray.init(_plasma_directory="/tmp") - A SETTING TO DISABLE OUT-OF-CORE RAY, which let's MODIN handle larger datasets but still results with OOM
- POSSIBLY execution failed as the ml.m3.medium istance has only 4GB of RAM thus OOM errors might be caused not by the size of data itslef but
- due to internal workings of the algorithm or system level issues

- NEXT STEP: test on bigger instance with more data
- NEXT STEP: Implement Generators

In [None]:
# INPUT SIZE CHECK
2**15 * 2**8, 2**15 * 2**4

In [None]:
# WITH PURE PANDAS
import pandas
import numpy as np
df = pandas.concat([pandas.DataFrame(np.random.randint(0, 100, size=(2**15, 2**8))) for _ in range(20)]) # Memory Error!
df.info()

In [None]:
# WITH PURE PANDAS - ITERATIVE
import pandas as pd
import numpy as np

df = pd.DataFrame()
for i in range(50):    
    df = pd.concat([df, pd.DataFrame(np.random.randint(0, 100, size=(2**15, 2**8)))]) # BREAKS around 20-25 iteration!!!
    df.info()

In [None]:
# WITH MODIN

# REQUIRES KERNEL RESTARTS or WORKOUT PROPER ray.actor KILLING
import modin.pandas as pd
import numpy as np
import ray

ray.init(_plasma_directory="/tmp") # SETTING TO DISABLE OUT-OF-CORE RAY !?!??! - MAKES IT ALL WORK

#df = pd.concat([pd.DataFrame(np.random.randint(0, 100, size=(2**15, 2**8))) for _ in range(25)]) # Working!!!
df = pd.concat([pd.DataFrame(np.random.randint(0, 100, size=(2**15, 2**8))) for _ in range(50)]) # Working with PLASMA!!!
#df = pd.concat([pd.DataFrame(np.random.randint(0, 100, size=(2**15, 2**8))) for _ in range(100)]) # OOM ERROR!!!
df.info()
ray.shutdown()

In [None]:
ray.shutdown()

In [None]:
import ray
ray.init()

import modin.pandas as pd
import numpy as np

df = pd.DataFrame()
for i in range(50):    
    df = pd.concat([df, pd.DataFrame(np.random.randint(0, 100, size=(2**15, 2**8)))]) # Working!!!
    df.info()
ray.shutdown()

In [None]:
# ADDITIONAL MINI TESTS OF PANDAS vs. MODIN

In [None]:
%%time
df = pd.read_csv("HIGGS.csv")

In [None]:
M1 with 10 CORES

with PANDAS
CPU times: user 35.4 s, sys: 3.48 s, total: 38.9 s
Wall time: 40.1 s
    
with MODIN
CPU times: user 3.44 s, sys: 2.02 s, total: 5.45 s
Wall time: 23.9 s

In [None]:
%%time
print(df.describe())

In [None]:
M1 with 10 CORES

with PANDAS
CPU times: user 7.55 s, sys: 1.33 s, total: 8.87 s
Wall time: 9.33 s
    
with MODIN
CPU times: user 102 ms, sys: 63.5 ms, total: 165 ms
Wall time: 11.6 s

In [None]:
%%time
df.groupby(df.columns[0]).count()

In [None]:
M1 with 10 CORES

with PANDAS
CPU times: user 1.11 s, sys: 101 ms, total: 1.21 s
Wall time: 1.21 s
    
with MODIN
CPU times: user 28.3 ms, sys: 13.3 ms, total: 41.6 ms
Wall time: 409 ms

In [None]:
%%time
df.max()

In [None]:
M1 with 10 CORES

with PANDAS
CPU times: user 415 ms, sys: 4.53 ms, total: 420 ms
Wall time: 417 ms
    
with MODIN
CPU times: user 16 ms, sys: 3.13 ms, total: 19.1 ms
Wall time: 18.6 ms

In [None]:
#########################################
###### HELPER FUCNTIONS, ENV SETUP ######
#########################################

!pip install -U ray xgboost_ray modin pyarrow
# modin[ray] -- Results in BrokenPipe Error and without specyfying ray it works.

In [None]:
### GET HIGGS DATASET -- CSV

# AWS Python SDK
import boto3

# When running on SageMaker, need execution role
from sagemaker import get_execution_role
role = get_execution_role()

# Declare bucket name, remote file, and destination
my_bucket = 'data-distributions'
orig_file = 'HIGGS/HIGGS.csv.zip'
dest_file = 'HIGGS.csv.zip'

# Connect to S3 bucket and download file
s3 = boto3.resource('s3')
s3.Bucket(my_bucket).download_file(orig_file, dest_file)

import zipfile
with zipfile.ZipFile("HIGGS.csv.zip", 'r') as zip_ref:
    zip_ref.extractall("./")

In [None]:
# CREATE PARQUET FILES:

import numpy as np

col_str = [str(i) for i in df.columns]
df.columns = col_str

list_df = np.array_split(df, 40)

for n, i in enumerate(list_df):
    print(n, i.shape, type(i))
    i.to_parquet('HIGHGS_'+str(n)+'.parquet', engine='pyarrow')

In [None]:
# MULTIPLY PARQUET FILES

import shutil

src = "PARQUET/abalone_train_33.parquet"

for i in range(256):
    file_id = str(97 + i + 1)
    dst = "PARQUET/abalone_train_"+file_id+".parquet"
    shutil.copy2(src, dst)

In [None]:
# PARTIONED MODIN DATASET - ATTEMPT - TBC

In [None]:
#import modin.pandas as pd
#import ray

# TO CONNECT WITH EXISTING CLUSTER
#ray.init(address="auto")
#ray.init(address="127.0.0.1:PORT")

df = pd.read_csv("HIGGS.csv")

model.best_iteration, model.attributes()

In [None]:
import modin.pandas as pd
from modin.distributed.dataframe.pandas import unwrap_partitions, from_partitions
import numpy as np
data = np.random.randint(0, 100, size=(2 ** 10, 2 ** 8))
df = pd.DataFrame(data)
print(df)
#partitions = unwrap_partitions(df, axis=0, get_ip=True)
partitions = unwrap_partitions(df, axis=0)
print(partitions) #, type(partitions), len(partitions), partitions[0], dir(partitions[0]))
new_df = from_partitions(partitions, axis=0)
print(new_df)

In [None]:
%%time
dmatrix = xgb.DMatrix(partitions.iloc[:,:-1], partitions.iloc[:,-1])
model = xgb.train({}, dmatrix, num_actors=10)