In [None]:
!pip install https://mlplatform.hb.ru-msk.vkcs.cloud/mlplatform_client.tar.gz

In [None]:
import os
from mlplatform_client import MLPlatform

#Задаем токен
#Самый первый токен с правами Администратора нужно получить в ЛК VK Cloud, 
#далее токенами можно управлять с помощью библиотеки
REFRESH_TOKEN = "REPLACE_WITH_YOUR_TOKEN"
mlp = MLPlatform(refresh_token=REFRESH_TOKEN)

#Получаем список кластеров
clusters = mlp.get_clusters()

#Выводим детальную информацию по кластерам
print(clusters)
print(8*"*")

#К различным параметрам можно обращаться из кода, что позволяет строить автоматические пайплайны
CLUSTER_ID = clusters[0].id
BUCKET_NAME = clusters[0].s3_bucket_name
 
print(CLUSTER_ID)
print(BUCKET_NAME)

In [None]:
#Получаем информацию о токене, который используем
mlp.who_am_i()

In [None]:
#Так можно получить информацию о всех токенах в проекте
mlp.get_project_tokens_info()

In [None]:
#Задаем имя нового приложения для запуска
#Каждое новое Spark приложение, запускаемое на кластере, должно иметь уникальное имя
#При попытке запустить несколько приложений с одним именем вы получите ошибку
#Для исправления смените имя приложения с помощью метода client_manifest.job_name = 'new-app-name'
JOB_NAME = "spark-pi-1"

#Получаем манифест и задаем имя приложения
client_manifest = mlp.get_default_manifest(cluster_id=CLUSTER_ID, job_name=JOB_NAME)

#Меняем дефолтные параметры на желаемые
client_manifest.set_executor_settings(
    {"instances": 1, "cores":1}
)

#Задаем имя файла для запуска. Данный файл включен в докер образ по умолчанию для тестовых целей.
client_manifest.main_app_file="local:///opt/spark/examples/src/main/python/pi.py"

In [None]:
#Выводим манифест и проверяем настройки
print(client_manifest)

In [None]:
#Запускам Spark приложение
mlp.spark_submit_job(cluster_id=CLUSTER_ID, manifest=client_manifest)

In [None]:
#Получаем информацию о запущенном приложении
#На этом этапе можно получить ошибку. В случае ошибки подождите 30-60 секунд и перезапустите cell
#Ошибка может быть связана с тем, что ваше приложение еще не успело запуститься 
#соответственно библиотека не может получить информацию о приложении

job_info = mlp.spark_job_info(CLUSTER_ID, JOB_NAME)
print(job_info)
print(job_info.pod_status)
print(job_info.pod_status_reason)
print(job_info.pod_state)
print(job_info.pod_state_reason)
print(job_info.pod_state_reason_message)

In [None]:
#Выводим список всех запущенных приложений
print('\nСписок всех запущенных приложений')
all_jobs = mlp.spark_jobs_list(CLUSTER_ID)
print(all_jobs)

In [None]:
#Получаем список событий на кластере. Необходимо для поиска ошибок, понимания текущего состояния кластера
events = mlp.spark_events(CLUSTER_ID)
print(events)

In [None]:
#Выводим логи нужного приложения
logs = mlp.spark_job_logs(cluster_id=CLUSTER_ID, job_name=JOB_NAME)
print(logs)

In [None]:
#Меняем имя приложения
JOB_NAME = 'spark-pi-2'
client_manifest.job_name = JOB_NAME

#Меняем дефолтные параметры на желаемые
#Меняем число Spark executors и параметры executors
client_manifest.set_executor_settings(
    {"instances": 2, "coreRequest": "1000m", "coreLimit": "3000m", "memory":"1024m"}
)

#меняем настройки Spark driver
client_manifest.set_driver_settings(
    {"coreLimit": "2", "coreRequest": "1000m", "coreLimit": "3000m", "memory":"1024m"}
)

#Сохраняем манифест для редактирования в будущем или для загрузки в случае повторного использования
client_manifest.save_yaml("test_app.yaml")

#Загружаем манифест из из yaml файла
client_manifest = mlp.get_manifest_from_yaml_file(yaml_file_path="test_app.yaml")

In [None]:
print(client_manifest)

In [None]:
#Запускам Spark приложение на основе загруженного манифеста
mlp.spark_submit_job(cluster_id=CLUSTER_ID, manifest=client_manifest)

In [None]:
#Получаем список событий на кластере. Необходимо для поиска ошибок, понимания текущего состояния кластера
events = mlp.spark_events(CLUSTER_ID)
print(events)

In [None]:
#Выводим логи приложения
logs = mlp.spark_job_logs(cluster_id=CLUSTER_ID, job_name=JOB_NAME)
print(logs)

In [None]:
#Генерируем тестовый датасет и сохраняем его в S3
JOB_NAME = 'email-gen-1'
client_manifest.job_name = JOB_NAME

#Меняем дефолтные параметры на желаемые
#Меняем число Spark executors и параметры executors
client_manifest.set_executor_settings(
    {"instances": 2, "coreRequest": "1000m", "coreLimit": "2000m", "memory":"1024m"}
)

#меняем настройки Spark driver
client_manifest.set_driver_settings(
    {"coreRequest": "1000m", "coreLimit": "2000m", "memory":"1024m"}
)

mlp.spark_submit_job(cluster_id=CLUSTER_ID, manifest=client_manifest, pycode_file_path="email_generation.py")

#Сохраняем манифест для редактирования в будущем или для загрузки в случае повторного использования
client_manifest.save_yaml("email_gen.yaml")

In [None]:
#Получаем список событий на кластере
events = mlp.spark_events(CLUSTER_ID)
print(events)

In [None]:
#Выводим логи приложения
logs = mlp.spark_job_logs(cluster_id=CLUSTER_ID, job_name=JOB_NAME)
print(logs)

In [None]:
#Выводим список всех запущенных приложений
print('\nСписок всех запущенных приложений')
all_jobs = mlp.spark_jobs_list(CLUSTER_ID)
print(all_jobs)

In [None]:
#Если приложение упало с ошибкой, его можно удалить и запустить заново после правок
#mlp.spark_delete_job(cluster_id=CLUSTER_ID, job_name=JOB_NAME)

In [None]:
#Env, S3
#Параметризуем наше приложение для генерирования датасета
#Не хардкодим путь для записи в S3, указываем его как переменную окружения

In [None]:
#Меняем имя приложения
JOB_NAME = 'email-gen-env-1'
client_manifest = mlp.get_default_manifest(cluster_id=CLUSTER_ID, job_name=JOB_NAME)

client_manifest.job_name = JOB_NAME

#Меняем дефолтные параметры на желаемые
#Меняем число Spark executors и параметры executors
client_manifest.set_executor_settings(
    {"instances": 2, "coreRequest": "1000m", "coreLimit": "2000m", "memory":"1024m"}
)

#меняем настройки Spark driver
client_manifest.set_driver_settings(
    {"coreRequest": "1000m", "coreLimit": "2000m", "memory":"1024m"}
)

#Задаем переменные окружения для driver
client_manifest.add_driver_env(
    [{"name": "S3_WRITE_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/email_env"}]
)

#Задаем переменные окружения для executors
client_manifest.add_executor_env(
    [{"name": "S3_WRITE_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/email_env"}]
)

#Имя бакета, подключенного по умолчанию к данному кластеру Spark можно найти с помощью команд или в UI VK Cloud
#Получаем список кластеров
#clusters = mlp.get_clusters()
#BUCKET_NAME = clusters[0].s3_bucket_name
#print(BUCKET_NAME)


#Указываем файл для запуска приложения в S3
#Предварительно нужно сохранить файл email_generation_env.py в S3
client_manifest.main_app_file = f"s3a://REPLACE_WITH_YOUR_BUCKET/spark-files/email_generation_env.py"

mlp.spark_submit_job(cluster_id=CLUSTER_ID, manifest=client_manifest)

#Сохраняем манифест для редактирования в будущем или для загрузки в случае повторного использования
#client_manifest.save_yaml("email_gen_env.yaml")

In [None]:
#print(client_manifest)

In [None]:
#Выводим логи приложения
logs = mlp.spark_job_logs(cluster_id=CLUSTER_ID, job_name=JOB_NAME)
print(logs)

In [None]:
#Выводим список всех запущенных приложений
print('\nСписок всех запущенных приложений')
all_jobs = mlp.spark_jobs_list(CLUSTER_ID)
print(all_jobs)

In [None]:
#mlp.spark_delete_job(cluster_id=CLUSTER_ID, job_name=JOB_NAME)

In [None]:
#S3, ClickHouse
#Продолжаем добавлять параметры, учимся добавлять зависимости для работы, например, с ClickHouse
#Для данного примера мы создаем инстанс ClickHouse в UI VK Cloud
https://cloud.vk.com/docs/ru/dbs/dbaas
https://cloud.vk.com/docs/ru/dbs/dbaas/instructions/create/create-single-replica 

In [None]:
#Меняем имя приложения
JOB_NAME = 'email-to-s3-ch-1'
client_manifest = mlp.get_default_manifest(cluster_id=CLUSTER_ID, job_name=JOB_NAME)

client_manifest.job_name = JOB_NAME

#Меняем дефолтные параметры на желаемые
#Меняем число Spark executors и параметры executors
client_manifest.set_executor_settings(
    {"instances": 2, "coreRequest": "1000m", "coreLimit": "2000m", "memory":"1024m"}
)

#меняем настройки Spark driver
client_manifest.set_driver_settings(
    {"coreRequest": "1000m", "coreLimit": "2000m", "memory":"1024m"}
)

#Задаем переменные окружения для driver
client_manifest.add_driver_env(
    [
        {"name": "S3_INPUT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/email"},
        {"name": "S3_OUTPUT_CORRECT_PATH", "value": "s3a://kREPLACE_WITH_YOUR_BUCKET/datasets/correct_email"},
        {"name": "S3_OUTPUT_INCORRECT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/incorrect_email"},
        {"name": "CH_OUTPUT_CORRECT_PATH", "value": "correct_emails"}
    ]
)

#Задаем переменные окружения для executors
client_manifest.add_executor_env(
    [
        {"name": "S3_INPUT_PATH", "value": "s3a://k8s-3c6cfde5e7796c-bucket/datasets/email"},
        {"name": "S3_OUTPUT_CORRECT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/correct_email"},
        {"name": "S3_OUTPUT_INCORRECT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/incorrect_email"},
        {"name": "CH_OUTPUT_CORRECT_PATH", "value": "correct_emails"}
    ]
)

#Указываем файл для запуска приложения в S3
client_manifest.main_app_file = f"s3a://REPLACE_WITH_YOUR_BUCKET/spark-files/email_to_s3_ch.py"

# Добавим в зависимости jar-файл из s3-бакета
#Jar необходимо заранее скопировать в бакет
client_manifest.add_jars(["s3a://REPLACE_WITH_YOUR_BUCKET/spark-files/clickhouse-jdbc-0.5.0-shaded.jar"])

mlp.spark_submit_job(
    cluster_id=CLUSTER_ID, 
    manifest=client_manifest, 
)

In [None]:
#Выводим логи приложения
logs = mlp.spark_job_logs(cluster_id=CLUSTER_ID, job_name=JOB_NAME)
print(logs)

In [None]:
#Получаем список событий на кластере
events = mlp.spark_events(CLUSTER_ID)
print(events)

In [None]:
#Выводим список всех запущенных приложений
print('\nСписок всех запущенных приложений')
all_jobs = mlp.spark_jobs_list(CLUSTER_ID)
print(all_jobs)

In [None]:
#mlp.spark_delete_job(cluster_id=CLUSTER_ID, job_name=JOB_NAME)

In [None]:
#Использование Secret для хранения Credentials
#В данном примере мы учимся использовать Secret для хранения чувствительной информации, например, пароли

In [None]:
#Работа с Secrets
#Получаем список Secrets в кластере
mlp.list_secrets(cluster_id=CLUSTER_ID)

In [None]:
#Создаем Secret из yaml
#Пример yaml можно найти в репозитории
mlp.create_secret_from_yaml(cluster_id=CLUSTER_ID, secret_yaml_path='ch_cred.yaml')

In [None]:
#Получаем информацию о созданном Secret
mlp.get_secret_detail(cluster_id=CLUSTER_ID, secret_name='ch-secret')

In [None]:
#Удаляем Secret, если необходимо
#mlp.delete_secret(cluster_id=CLUSTER_ID, secret_name='ch-secret')

In [None]:
#Меняем имя приложения
JOB_NAME = 'email-to-s3-ch-secret-1'
client_manifest = mlp.get_default_manifest(cluster_id=CLUSTER_ID, job_name=JOB_NAME)

client_manifest.job_name = JOB_NAME

#Меняем дефолтные параметры на желаемые
#Меняем число Spark executors и параметры executors
client_manifest.set_executor_settings(
    {"instances": 2, "coreRequest": "1000m", "coreLimit": "2000m", "memory":"1024m"}
)

#меняем настройки Spark driver
client_manifest.set_driver_settings(
    {"coreLimit": "2", "coreRequest": "1000m", "coreLimit": "2000m", "memory":"1024m"}
)

#Задаем переменные окружения для driver
client_manifest.add_driver_env(
    [
        {"name": "S3_INPUT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/email"},
        {"name": "S3_OUTPUT_CORRECT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/correct_emails_using_secrets"},
        {"name": "S3_OUTPUT_INCORRECT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/incorrect_email"},
        {"name": "CH_OUTPUT_CORRECT_PATH", "value": "correct_emails_using_secrets"}
    ]
)

#Задаем переменные окружения для executors
client_manifest.add_executor_env(
    [
        {"name": "S3_INPUT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/email"},
        {"name": "S3_OUTPUT_CORRECT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/correct_emails_using_secrets"},
        {"name": "S3_OUTPUT_INCORRECT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/incorrect_email"},
        {"name": "CH_OUTPUT_CORRECT_PATH", "value": "correct_emails_using_secrets"}
    ]
)

#Задаем переменные окружения из Secret для Driver и Executors
client_manifest.add_driver_env_from([{"secretRef": {"name": "ch-secret"}}])
client_manifest.add_executor_env_from([{"secretRef": {"name": "ch-secret"}}])

#Указываем файл для запуска приложения в S3
client_manifest.main_app_file = f"s3a://REPLACE_WITH_YOUR_BUCKET/spark-files/email_to_s3_ch_secret.py"

# Добавим в зависимости jar-файл из s3-бакета
client_manifest.add_jars(["s3a://REPLACE_WITH_YOUR_BUCKET/spark-files/clickhouse-jdbc-0.5.0-shaded.jar"])

mlp.spark_submit_job(
    cluster_id=CLUSTER_ID, 
    manifest=client_manifest, 
)


In [None]:
#Выводим логи приложения
logs = mlp.spark_job_logs(cluster_id=CLUSTER_ID, job_name=JOB_NAME)
print(logs)

In [None]:
#Выводим список всех запущенных приложений
print('\nСписок всех запущенных приложений')
all_jobs = mlp.spark_jobs_list(CLUSTER_ID)
print(all_jobs)

In [None]:
#mlp.spark_delete_job(cluster_id=CLUSTER_ID, job_name=JOB_NAME)

In [None]:
#Troubleshooting
#Учимся искать баги, отлаживать Spark приложения в K8s
#Для этого используем следующие основные методы

#Получаем общую информацию о статусе приложения
#job_info = mlp.spark_job_info(cluster_id=CLUSTER_ID, job_name=JOB_NAME)
#print(job_info)

#Выводим детальную информацию о приложении со списком событий, связанных с приложением
#job_details = mlp.describe_spark_job(cluster_id=CLUSTER_ID, job_name=JOB_NAME)
#print(job_details)

#Выводим логи приложения
#logs = mlp.spark_job_logs(cluster_id=CLUSTER_ID, job_name=JOB_NAME)
#print(logs)

#Получаем список событий на кластере. Ищем события, связанные с нашим приложением или состоянием кластера
#events = mlp.spark_events(CLUSTER_ID)
#print(events)

In [None]:
#Меняем имя приложения
JOB_NAME = 'email-to-s3-ch-secret-2'
client_manifest = mlp.get_default_manifest(cluster_id=CLUSTER_ID, job_name=JOB_NAME)

client_manifest.job_name = JOB_NAME

#Меняем дефолтные параметры на желаемые
#Меняем число Spark executors и параметры executors
#Задаем слишком высокие требования по ядрам или памяти для того, чтобы приложение не запустилось 
#на этом примере учимся искать ошибку в событиях, логах
client_manifest.set_executor_settings(
    {"instances": 4, "coreRequest": "15000m", "coreLimit": "16000m", "memory":"11024m"}
)

#меняем настройки Spark driver
client_manifest.set_driver_settings(
    {"coreLimit": "2", "cores": 2, "memory":"1024m"}
)

#Задаем переменные окружения для driver
client_manifest.add_driver_env(
    [
        {"name": "S3_INPUT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/email"},
        {"name": "S3_OUTPUT_CORRECT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/correct_emails_using_secrets"},
        {"name": "S3_OUTPUT_INCORRECT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/incorrect_email"},
        {"name": "CH_OUTPUT_CORRECT_PATH", "value": "correct_emails_using_secrets"}
    ]
)

#Задаем переменные окружения для executors
client_manifest.add_executor_env(
    [
        {"name": "S3_INPUT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/email"},
        {"name": "S3_OUTPUT_CORRECT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/correct_emails_using_secrets"},
        {"name": "S3_OUTPUT_INCORRECT_PATH", "value": "s3a://REPLACE_WITH_YOUR_BUCKET/datasets/incorrect_email"},
        {"name": "CH_OUTPUT_CORRECT_PATH", "value": "correct_emails_using_secrets"}
    ]
)

#Задаем переменные окружения из Secret для Driver и Executors
client_manifest.add_driver_env_from([{"secretRef": {"name": "ch-secret"}}])
client_manifest.add_executor_env_from([{"secretRef": {"name": "ch-secret"}}])

#Указываем файл для запуска приложения в S3
client_manifest.main_app_file = f"s3a://REPLACE_WITH_YOUR_BUCKET/spark-files/email_to_s3_ch_secret.py"

# Добавим в зависимости jar-файл из s3-бакета
client_manifest.add_jars(["s3a://REPLACE_WITH_YOUR_BUCKET/spark-files/clickhouse-jdbc-0.5.0-shaded.jar"])

mlp.spark_submit_job(
    cluster_id=CLUSTER_ID, 
    manifest=client_manifest, 
)


In [None]:
print(client_manifest)

In [None]:
#Получаем общую информацию о статусе приложения
job_info = mlp.spark_job_info(cluster_id=CLUSTER_ID, job_name=JOB_NAME)
print(job_info)

In [None]:
#Выводим детальную информацию о приложении со списком событий, связанных с приложением
#Изучаем данные в events
job_details = mlp.describe_spark_job(cluster_id=CLUSTER_ID, job_name=JOB_NAME)
print(job_details)

In [None]:
#Выводим логи приложения
#В логах должно быть видно, что приложение зависло
logs = mlp.spark_job_logs(cluster_id=CLUSTER_ID, job_name=JOB_NAME)
print(logs)

In [None]:
#Получаем список событий на кластере
#В списке событий на кластере ищем события, относящиеся к приложению и находим ошибку
#В events должно появится сообщение относительно нехватки ресурсов для запуска Spark Executors
events = mlp.spark_events(CLUSTER_ID)
print(events)

In [None]:
#Выводим список всех запущенных приложений
print('\nСписок всех запущенных приложений')
all_jobs = mlp.spark_jobs_list(CLUSTER_ID)
print(all_jobs)

In [None]:
#Удаляем приложение после завершения обучения
mlp.spark_delete_job(cluster_id=CLUSTER_ID, job_name=JOB_NAME)

In [None]:
#Для получения помощи можно использовать методы, указанные ниже и документацию
https://cloud.vk.com/docs/ru/ml/spark-to-k8s

In [None]:
#Выводим help по конкретному методу
help(client_manifest.set_executor_settings)

In [None]:
#Выводим help по объекту client_manifest
help(client_manifest)

In [None]:
#Выводим help по всей библиотеке
import mlplatform_client

help(mlplatform_client)