Skip to content

Commit

Permalink
Merge pull request #7 from fonhorst/main
Browse files Browse the repository at this point in the history
Implementation of Parallel SLAMA
  • Loading branch information
dev-rinchin committed Jul 13, 2023
2 parents 7ff5bc0 + 452fa86 commit 6023ee6
Show file tree
Hide file tree
Showing 86 changed files with 5,961 additions and 2,399 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ dumps
*.orig

jars/*.jar
!jars/spark-lightautoml_*.jar

docs/generated
docs/slama-docs/generated
Expand Down
2 changes: 1 addition & 1 deletion docs/yarn_cluster_spark_submit.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ To launch example 'tabular-preset-automl.py' (the most comprehensive example) ru
--conf "spark.driver.extraJavaOptions=-Dio.netty.tryReflectionSetAccessible=true" \
--conf "spark.executor.extraJavaOptions=-Dio.netty.tryReflectionSetAccessible=true" \
--conf "spark.jars.repositories=https://mmlspark.azureedge.net/maven"
--conf "spark.jars.packages=com.microsoft.azure:synapseml_2.12:0.9.5,io.github.fonhorst:spark-lightautoml_2.12:0.1"
--conf "spark.jars.packages=com.microsoft.azure:synapseml_2.12:0.9.5,io.github.fonhorst:spark-lightautoml_2.12:0.1.1"
--conf "spark.sql.warehouse.dir=${WAREHOUSE_DIR}" \
--py-files "examples-spark/*,tabular_config.yml" \
--num-executors "${EXECUTOR_INSTANCES}" \
Expand Down
14 changes: 7 additions & 7 deletions examples/spark/automl_feature_scores.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging.config

from examples_utils import get_spark_session, prepare_test_and_train, get_dataset_attrs
from examples_utils import get_spark_session, prepare_test_and_train, get_dataset
from sparklightautoml.automl.presets.tabular_presets import SparkTabularAutoML
from sparklightautoml.tasks.base import SparkTask
from sparklightautoml.utils import log_exec_timer, logging_config, VERBOSE_LOGGING_FORMAT

logging.config.dictConfig(logging_config(level=logging.INFO, log_filename='/tmp/slama.log'))
logging.config.dictConfig(logging_config(log_filename='/tmp/slama.log'))
logging.basicConfig(level=logging.DEBUG, format=VERBOSE_LOGGING_FORMAT)
logger = logging.getLogger(__name__)

Expand All @@ -20,11 +20,11 @@
cv = 2
use_algos = [["linear_l2"]]
dataset_name = "used_cars_dataset"
path, task_type, roles, dtype = get_dataset_attrs(dataset_name)
dataset = get_dataset(dataset_name)

with log_exec_timer("spark-lama training") as train_timer:
task = SparkTask(task_type)
train_data, test_data = prepare_test_and_train(spark, path, seed)
task = SparkTask(dataset.task_type)
train_data, test_data = prepare_test_and_train(dataset, seed)

test_data_dropped = test_data

Expand All @@ -40,7 +40,7 @@

oof_predictions = automl.fit_predict(
train_data,
roles=roles
roles=dataset.roles
).persist()

logger.info("Predicting on out of fold")
Expand All @@ -50,7 +50,7 @@

logger.info(f"score for out-of-fold predictions: {metric_value}")

feature_scores = automl.get_feature_scores(calc_method="fast", data=test_data_dropped, silent=False)
feature_scores = automl.get_feature_scores(data=test_data_dropped, silent=False)

print(feature_scores)

Expand Down
14 changes: 7 additions & 7 deletions examples/spark/automl_multiclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pyspark.sql import functions as sf

from examples_utils import get_persistence_manager, BUCKET_NUMS
from examples_utils import get_spark_session, prepare_test_and_train, get_dataset_attrs
from examples_utils import get_spark_session, prepare_test_and_train, get_dataset
from sparklightautoml.automl.presets.tabular_presets import SparkTabularAutoML
from sparklightautoml.dataset.base import SparkDataset
from sparklightautoml.tasks.base import SparkTask
Expand All @@ -29,12 +29,12 @@
cv = 2
use_algos = [["lgb"]]
dataset_name = "ipums_97"
path, task_type, roles, dtype = get_dataset_attrs(dataset_name)
dataset = get_dataset(dataset_name)

train_data, test_data = prepare_test_and_train(spark, path, seed)
train_data, test_data = prepare_test_and_train(dataset, seed)

with log_exec_timer("spark-lama training") as train_timer:
task = SparkTask(task_type)
task = SparkTask(dataset.task_type)

automl = SparkTabularAutoML(
spark=spark,
Expand All @@ -46,7 +46,7 @@
tuning_params={'fit_on_holdout': True, 'max_tuning_iter': 10, 'max_tuning_time': 3600}
)

preds = automl.fit_predict(train_data, roles, persistence_manager=persistence_manager).persist()
preds = automl.fit_predict(train_data, dataset.roles, persistence_manager=persistence_manager).persist()

logger.info("Predicting on out of fold")

Expand All @@ -69,7 +69,7 @@
score = task.get_dataset_metric()
expected_metric_value = score(te_pred.select(
SparkDataset.ID_COLUMN,
sf.col(roles['target']).alias('target'),
sf.col(dataset.roles['target']).alias('target'),
sf.col(pred_column).alias('prediction')
))

Expand All @@ -88,7 +88,7 @@
score = task.get_dataset_metric()
actual_metric_value = score(te_pred.select(
SparkDataset.ID_COLUMN,
sf.col(roles['target']).alias('target'),
sf.col(dataset.roles['target']).alias('target'),
sf.col(pred_column).alias('prediction')
))
logger.info(f"score for test predictions via loaded pipeline: {actual_metric_value}")
Expand Down
212 changes: 150 additions & 62 deletions examples/spark/examples_utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
import os
import inspect
from typing import Tuple, Optional
import os
from dataclasses import dataclass, field
from typing import Tuple, Optional, Any, Dict

from pyspark import SparkContext
from pyspark.sql import SparkSession

from sparklightautoml.utils import SparkDataFrame
from sparklightautoml.dataset import persistence
from sparklightautoml.utils import SparkDataFrame, get_current_session


BUCKET_NUMS = 16
BUCKET_NUMS = 6
PERSISTENCE_MANAGER_ENV_VAR = "PERSISTENCE_MANAGER"
BASE_DATASETS_PATH = "file:///opt/spark_data/"


@dataclass(frozen=True)
class Dataset:
path: str
task_type: str
roles: Dict[str, Any]
dtype: Dict[str, str] = field(default_factory=dict)
file_format: str = 'csv'
file_format_options: Dict[str, Any] = field(default_factory=lambda: {"header": True, "escape": "\""})

def load(self) -> SparkDataFrame:
spark = get_current_session()
return spark.read.format(self.file_format).options(**self.file_format_options).load(self.path)


def ds_path(rel_path: str) -> str:
return os.path.join(BASE_DATASETS_PATH, rel_path)


used_cars_params = {
Expand All @@ -32,75 +52,73 @@
}

DATASETS = {
"used_cars_dataset": {
"path": "file:///opt/spark_data/small_used_cars_data.csv",
**used_cars_params
},

"used_cars_dataset_1x": {
"path": "file:///opt/spark_data/derivative_datasets/1x_dataset.csv",
"used_cars_dataset": Dataset(
path=ds_path("small_used_cars_data.csv"),
**used_cars_params
},

"used_cars_dataset_4x": {
"path": "file:///opt/spark_data/derivative_datasets/4x_dataset.csv",
),
"used_cars_dataset_1x": Dataset(
path=ds_path("derivative_datasets/1x_dataset.csv"),
**used_cars_params
},

),
"used_cars_dataset_4x": Dataset(
path=ds_path("derivative_datasets/4x_dataset.csv"),
**used_cars_params
),
"lama_test_dataset": Dataset(
path=ds_path("sampled_app_train.csv"),
task_type="binary",
roles={"target": "TARGET", "drop": ["SK_ID_CURR"]}
),
# https://www.openml.org/d/4549
"buzz_dataset": {
"path": "file:///opt/spark_data/Buzzinsocialmedia_Twitter_25k.csv",
"task_type": "reg",
"roles": {"target": "Annotation"},
},

"lama_test_dataset": {
"path": "file:///opt/spark_data/sampled_app_train.csv",
"task_type": "binary",
"roles": {"target": "TARGET", "drop": ["SK_ID_CURR"]},
},

"buzz_dataset": Dataset(
path=ds_path("Buzzinsocialmedia_Twitter_25k.csv"),
task_type="binary",
roles={"target": "TARGET", "drop": ["SK_ID_CURR"]}
),
# https://www.openml.org/d/734
"ailerons_dataset": {
"path": "file:///opt/spark_data/ailerons.csv",
"task_type": "binary",
"roles": {"target": "binaryClass"},
},

"ailerons_dataset": Dataset(
path=ds_path("ailerons.csv"),
task_type="binary",
roles={"target": "binaryClass"}
),
# https://www.openml.org/d/382
"ipums_97": {
"path": "file:///opt/spark_data/ipums_97.csv",
"task_type": "multiclass",
"roles": {"target": "movedin"},
},

"company_bankruptcy_dataset": {
"path": "file:///opt/spark_data/company_bankruptcy_prediction_data.csv",
"task_type": "binary",
"roles": {"target": "Bankrupt?"},
}
"ipums_97": Dataset(
path=ds_path("ipums_97.csv"),
task_type="multiclass",
roles={"target": "movedin"}
),

"company_bankruptcy_dataset": Dataset(
path=ds_path("company_bankruptcy_prediction_data.csv"),
task_type="binary",
roles={"target": "Bankrupt?"}
)
}


def get_dataset_attrs(name: str):
return (
DATASETS[name]['path'],
DATASETS[name]['task_type'],
DATASETS[name]['roles'],
# to assure that LAMA correctly interprets certain columns as categorical
DATASETS[name].get('dtype', dict()),
)
def get_dataset(name: str) -> Dataset:
assert name in DATASETS, f"Unknown dataset: {name}. Known datasets: {list(DATASETS.keys())}"
return DATASETS[name]


def prepare_test_and_train(
dataset: Dataset,
seed: int,
test_size: float = 0.2
) -> Tuple[SparkDataFrame, SparkDataFrame]:
assert 0 <= test_size <= 1

spark = get_current_session()

def prepare_test_and_train(spark: SparkSession, path: str, seed: int) -> Tuple[SparkDataFrame, SparkDataFrame]:
execs = int(spark.conf.get('spark.executor.instances', '1'))
cores = int(spark.conf.get('spark.executor.cores', '8'))

data = spark.read.csv(path, header=True, escape="\"")
data = dataset.load()

data = data.repartition(execs * cores).cache()
data.write.mode('overwrite').format('noop').save()

train_data, test_data = data.randomSplit([0.8, 0.2], seed)
train_data, test_data = data.randomSplit([1 - test_size, test_size], seed)
train_data = train_data.cache()
test_data = test_data.cache()
train_data.write.mode('overwrite').format('noop').save()
Expand All @@ -117,14 +135,18 @@ def get_spark_session(partitions_num: Optional[int] = None):
if os.environ.get("SCRIPT_ENV", None) == "cluster":
spark_sess = SparkSession.builder.getOrCreate()
else:

# Be aware, this an alternative way to supply SLAMA with its jars using maven repository
# Example requesting both synapseml and SLAMA jar from Maven Central
# .config("spark.jars.packages",
# "com.microsoft.azure:synapseml_2.12:0.9.5,io.github.fonhorst:spark-lightautoml_2.12:0.1.1")

spark_sess = (
SparkSession
.builder
.master("local[4]")
# .config("spark.jars.packages",
# "com.microsoft.azure:synapseml_2.12:0.9.5,io.github.fonhorst:spark-lightautoml_2.12:0.1")
.master(f"local[{partitions_num}]")
.config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.9.5")
.config("spark.jars", "jars/spark-lightautoml_2.12-0.1.jar")
.config("spark.jars", "jars/spark-lightautoml_2.12-0.1.1.jar")
.config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
.config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true")
.config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true")
Expand Down Expand Up @@ -169,3 +191,69 @@ def get_persistence_manager(name: Optional[str] = None):
f"Values for the following arguments have not been found: {none_val_args}"

return clazz(**ctr_arg_vals)


class FSOps:
"""
Set of typical fs operations independent of the fs implementation
see docs at: https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html
"""
@staticmethod
def get_sc() -> SparkContext:
spark = get_current_session()
sc = spark.sparkContext
return sc

@staticmethod
def get_default_fs() -> str:
spark = get_current_session()
hadoop_conf = spark._jsc.hadoopConfiguration()
default_fs = hadoop_conf.get("fs.defaultFS")
return default_fs

@classmethod
def get_fs(cls, path: str):
sc = cls.get_sc()

URI = sc._jvm.java.net.URI
FileSystem = sc._jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._jvm.org.apache.hadoop.conf.Configuration

path_uri = URI(path)
scheme = path_uri.getScheme()
if scheme:
authority = path_uri.getAuthority() or ''
fs_uri = f'{scheme}:/{authority}'
else:
fs_uri = cls.get_default_fs()

fs = FileSystem.get(URI(fs_uri), Configuration())

return fs

@classmethod
def exists(cls, path: str) -> bool:
sc = cls.get_sc()
Path = sc._jvm.org.apache.hadoop.fs.Path
fs = cls.get_fs(path)
return fs.exists(Path(path))

@classmethod
def create_dir(cls, path: str):
sc = cls.get_sc()
Path = sc._jvm.org.apache.hadoop.fs.Path
fs = cls.get_fs(path)
fs.mkdirs(Path(path))

@classmethod
def delete_dir(cls, path: str) -> bool:
sc = cls.get_sc()
Path = sc._jvm.org.apache.hadoop.fs.Path
fs = cls.get_fs(path)
return fs.delete(Path('/tmp/just_a_test'))


def check_columns(original_df: SparkDataFrame, predicts_df: SparkDataFrame):
absent_columns = set(original_df.columns).difference(predicts_df.columns)
assert len(absent_columns) == 0, \
f"Some columns of the original dataframe is absent from the processed dataset: {absent_columns}"

0 comments on commit 6023ee6

Please sign in to comment.