In [1]:
from cerebro.backend import SparkBackend
from cerebro.keras import SparkEstimator

# datas storage for intermediate data and model artifacts.
from cerebro.storage import LocalStore, HDFSStore

# Model selection/AutoML methods.
from cerebro.tune import GridSearch, RandomSearch, TPESearch

# Utility functions for specifying the search space.
from cerebro.tune import hp_choice, hp_uniform, hp_quniform, hp_loguniform, hp_qloguniform

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from pyspark.sql import SparkSession
import numpy as np
import os

os.environ["PYSPARK_PYTHON"] = '/usr/bin/python3.6'
os.environ["PYSPARK_DRIVER_PYTHON"] = '/usr/bin/python3.6'

from pyspark import SparkConf

conf = SparkConf().setAppName('cluster') \
    .setMaster('spark://10.10.1.1:7077') \
    .set('spark.task.cpus', '16') \
    .set('spark.executor.memory', '124g')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.addPyFile("cerebro.zip")

work_dir = '/var/nfs/'
backend = SparkBackend(spark_context=spark.sparkContext, num_workers=6)
store = LocalStore(prefix_path=work_dir + 'test/')

CEREBRO => Time: 2021-12-06 05:30:50, Running 6 Workers


In [2]:
from keras_tuner.engine import hyperparameters
import autokeras as ak
from cerebro.nas.hphpmodel import HyperHyperModel

# Define the search space
input_node = ak.StructuredDataInput()
otuput_node = ak.DenseBlock()(input_node)
output_node = ak.ClassificationHead()(otuput_node)

am = HyperHyperModel(input_node, output_node, seed=2500)

am.resource_bind(
    backend=backend, 
    store=store,
    feature_columns=["features"],
    label_columns=['labels'],
    evaluation_metric='accuracy', 
)

am.tuner_bind(
    tuner="greedy", 
    hyperparameters=None, 
    objective="val_accuracy",
    max_trials=20,
    overwrite=True,
    exploration=0.3,
)

In [3]:
ms = am.model_selection

_, _, metadata, _ = ms.backend.get_metadata_from_parquet(ms.store, ms.label_cols, ms.feature_cols)
ms.backend.initialize_workers()
ms.backend.initialize_data_loaders(ms.store, None, ms.feature_cols + ms.label_cols)

In [4]:
train_reader, val_reader = ms.backend.data_readers_fn(40)




In [4]:
TRAIN_NUM = 100
TEST_NUM = 100

# train_df = spark.read.format("parquet").option('header', 'true').option('inferSchema', 'true')\
#     .load(work_dir+'data/parquet/train/train_0.parquet')
# test_df = spark.read.format("parquet").option('header', 'true').option('inferSchema', 'true')\
#     .load(work_dir+'data/parquet/valid/valid_0.parquet')
train_df = spark.read.parquet(work_dir+'criteo/parquet/train/train_0.parquet')
# train_df = spark.read.parquet('/var/nfs/tmp/data/train.parquet')
test_df = spark.read.parquet(work_dir+'criteo/parquet/valid/valid_0.parquet')

train_row_nums = train_df.count()
test_row_nums = test_df.count()

train_data_ratio = TRAIN_NUM / train_row_nums
test_data_ratio = TEST_NUM / test_row_nums

print("Use {:%} of training data, with {} rows in the original data".format(train_data_ratio, train_row_nums))
print("Use {:%} of testing data, with {} rows in the original data".format(test_data_ratio, test_row_nums))

Use 0.006157% of training data, with 1624157 rows in the original data
Use 0.055420% of testing data, with 180439 rows in the original data


In [7]:
train_df.printSchema()
test_df.printSchema()

root
 |-- features: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- labels: array (nullable = true)
 |    |-- element: long (containsNull = true)

root
 |-- features: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- labels: array (nullable = true)
 |    |-- element: long (containsNull = true)



In [6]:
from pyspark.sql.functions import rand 
# train_df = train_df.orderBy(rand())
# test_df = test_df.orderBy(rand())
train_df_lm = train_df.limit(TRAIN_NUM)

test_df_lm = test_df.limit(TEST_NUM)

train_df_lm.write.parquet(work_dir+"limit/criteo/train.parquet")
test_df_lm.write.parquet(work_dir+"limit/criteo/test.parquet") 

In [10]:
from cerebro.backend.spark.util import _get_dataset_info
train_data_path = store.get_train_data_path(None)
train_data = ms.store.get_parquet_dataset(train_data_path)
schema = train_data.schema.to_arrow_schema()
train_rows, total_byte_size = _get_dataset_info(train_data, 'training', train_data_path)

In [13]:
train_rows

12993256

In [2]:
from keras_tuner.engine import hyperparameters
import autokeras as ak
from cerebro.nas.hphpmodel import HyperHyperModel

# Define the search space
input_node = ak.StructuredDataInput()
otuput_node = ak.DenseBlock()(input_node)
output_node = ak.ClassificationHead()(otuput_node)

am = HyperHyperModel(input_node, output_node, seed=2500)

am.resource_bind(
    backend=backend, 
    store=store,
    feature_columns=["features"],
    label_columns=['labels'],
    evaluation_metric='accuracy', 
)

am.tuner_bind(
    tuner="greedy", 
    hyperparameters=None, 
    objective="val_accuracy",
    max_trials=20,
    overwrite=True,
    exploration=0.3,
)

In [None]:
rel = am.fit(train_df, epochs=10)

import json
m = {}
for model in rel.metrics:
    m[model] = {}
    for key in rel.metrics[model]:
        if key != 'trial':
            m[model][key] = rel.metrics[model][key]

with open("criteo_nas_dev/metrics.txt", "w") as file:
    file.write(json.dumps(m))

CEREBRO => Time: 2021-12-04 06:14:15, Preparing Data
CEREBRO => Time: 2021-12-04 06:14:15, Num Partitions: 1
CEREBRO => Time: 2021-12-04 06:14:15, Writing DataFrames
CEREBRO => Time: 2021-12-04 06:14:15, Train Data Path: file:///var/nfs/test/intermediate_train_data
CEREBRO => Time: 2021-12-04 06:14:15, Val Data Path: file:///var/nfs/test/intermediate_val_data
