# Optimization scrypt

In [2]:
param_grid = { 'spark.serializer'                : ['org.apache.spark.serializer.KryoSerializer', 'org.apache.spark.serializer.JavaSerializer']    # выбрать одно значение для всех случаев
              ,'spark.kryoserializer.buffer.max' : [32, 64, 128, 256]      # выбрать одно значение для всех случаев
              ,'spark.kryoserializer.buffer'     : [32, 64, 128, 256]      # выбрать одно значение для всех случаев
              ,'spark.executor.cores'            : [2, 3, 4, 5]            # выбрать одно значение для всех случаев
              ,'spark.network.timeout'           : [120, 500, 1000, 3000]  # выбрать одно значение для всех случаев
              ,'spark.driver.memory'             : [2, 4, 8, 16]
              ,'spark.executor.memory'           : [8, 16, 24, 32]
              ,'spark.executor.memoryOverhead'   : [1, 2, 4, 8]
              ,'spark.memory.fraction'           : [0.6, 0.7, 0.8, 0.9]
              ,'spark.memory.storageFraction'    : [0.5, 0.6, 0.7, 0.8]
              ,'spark.shuffle.file.buffer'       : [64, 128, 256, 512]
              #,'spark.shuffle.manager'           : ['sort']
              #,'spark.shuffle.compress'          : [True]
              #,'spark.shuffle.spill.compress'    : [True]
              }

In [3]:
def iterator_dict(**kwargs):
    current_key = next(iter(kwargs))
    if len(kwargs) == 1:
        for item in kwargs[current_key]:
            yield {current_key: item}
    else:
        kwargs_copy = {
            key: val
            for key, val in kwargs.items()
            if key != current_key
        }
        for item in kwargs[current_key]:
            for sub_dict in iterator_dict(**kwargs_copy):
                out_dict = {current_key: item}
                out_dict.update(sub_dict)
                yield out_dict

# для расчета количества итераций
#i = 0
#param_grid_iter = iterator_dict(**param_grid)
#for item in param_grid_iter:
#  i+=1
#print(i)

In [5]:
def start_spark_session(        
                        application_name='gsa___bla bla bla pyu',
                        type_serializer='org.apache.spark.serializer.JavaSerializer',
                        kryo_buffer_max=32,   kryo_buffer=32,              executor_cores=1,
                        driver_memory=1,      executor_memory=1,           memory_overhead=1,
                        memory_fraction=0.6,  memory_storageFraction=0.5,  shuffle_file_buffer=128,
                        network_timeout=120):
    conf = SparkConf()
    conf.setAll(
       [
            ('spark.serializer', type_serializer),
            ('spark.kryoserializer.buffer.max', '{}m'.format(kryo_buffer_max)),
            ('spark.kryoserializer.buffer', '{}m'.format(kryo_buffer)),
            ('spark.memory.fraction', memory_fraction),
            ('spark.memory.storageFraction', memory_storageFraction),
            ('spark.shuffle.file.buffer', shuffle_file_buffer),
            ('spark.executor.cores', executor_cores),
            ('spark.driver.memory', '{}g'.format(driver_memory)),
            ('spark.executor.memory', '{}g'.format(executor_memory)),
            ('spark.yarn.executor.memoryOverhead', '{}g'.format(memory_overhead)),
            ('spark.network.timeout','{}s'.format(network_timeout)),
            ('spark.yarn.queue','team_mmb_prom_reserv'),
            ('spark.hive.mapred.supports.subdirectories', 'true'),
            ('spark.sql.hive.manageFilesourcePartitions', 'true'),
            ('spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive', 'true'),
            ('spark.sql.catalogImplementation', 'hive'),
            ('spark.hadoop.metastore.skip.load.functions.on.init', 'true'),
            ('spark.sql.optimizer.metadataOnly', 'true'),
            ('hive.exec.dynamic.partition', 'true'),
            ('hive.exec.dynamic.partition.mode', 'nonstrict'),
            ('spark.shuffle.manager', 'sort'),
            ('spark.shuffle.compress', 'true'),
            ('spark.shuffle.spill.compress', 'true'),
            ('spark.shuffle.service.enabled', 'true'),
            ('spark.sql.broadcastTimeout', '3000'),
            ('spark.driver.maxResultSize', '10g'),
            ('spark.hadoop.validateOutputSpecs', 'false'),
            ('spark.sql.codegen.wholeStage', 'true'),
            ('spark.sql.hive.convertMetastoreParquet', 'false'),
            ('spark.sql.parquet.readLegacyFormat', 'true'),
            ('spark.sql.parquet.binaryAsString', 'true'),
            ('spark.sql.sources.partitionColumnTypeInference.enabled', 'true'),
            ('spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation','true'),
            ('spark.dynamicAllocation.enabled', 'true'),
            ('spark.dynamicAllocation.executorIdleTimeout', '120s'),
            ('spark.dynamicAllocation.cachedExecutorIdleTimeout', '600s'),
            ('spark.dynamicAllocation.shuffleTracking.enabled', 'true'),
            ('spark.dynamicAllocation.initialExecutors', '0'),
            ('spark.dynamicAllocation.minExecutors', '0'),
            ('spark.dynamicAllocation.maxExecutors', '300'), #уточнить на примера ПРОМа какой максимуум? 300 или 500?)
            ('spark.yarn.access.hadoopFileSystems', 'hdfs://hdfsgw,hdfs://arnsdpldbr2,hdfs://arnsdpsbx,hdfs://arnsdpsmd2,hdfs://spsdpsmd,hdfs://arnsdprisk')
    ])
    
    spark = SparkSession.builder.master('yarn-client').appName(application_name).config(conf=conf).getOrCreate()
    print('Context ready: {}'.format(spark))
    #spark.sql('use {}'.format(default_schema));
    #print('Set default schema: {}'.format(default_schema))
    return spark


## Логирование

In [6]:
import logging
import sys

logger = logging.getLogger("spark_logsss")
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
    fmt="[{levelname[0]} {asctime}] {message}",
    datefmt="%y%m%d %H:%M:%S",
    style="{",
)

In [7]:
#logging.getLogger("spark").handlers.clear()
#logger.handlers.clear()

In [8]:
if not logger.handlers:
    #console_handler = logging.StreamHandler(stream=sys.stdout)
    file_handler = logging.FileHandler("logs_params.txt")

    #console_handler.setFormatter(formatter)
    file_handler.setFormatter(formatter)

    #logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    
#logger.handlers

In [9]:
logger.handlers

[<FileHandler /home/18289972_omega-sbrf-ru/notebooks/logs_params.txt (NOTSET)>]

In [15]:
import time

## Исследуем 'spark.serializer' и размер буферов

In [13]:
logger.info('\n'*1)
logger.info('spark.serializer')
logger.info('\n'*1)
param_grid_serializer = { 'spark.serializer'                : ['org.apache.spark.serializer.KryoSerializer', 'org.apache.spark.serializer.JavaSerializer']
                         ,'spark.kryoserializer.buffer.max' : [32, 64, 128, 256]
                         ,'spark.kryoserializer.buffer'     : [32, 64, 128, 256]
                        }
#param_grid_iter_srlzr = iterator_dict(**param_grid_serializer)
start_calc_fnc = [start_calc_3
                  ,start_calc_2]
min_time = 100000000000

for func in start_calc_fnc:
    logger.info(func.__name__)
    param_grid_iter_srlzr = iterator_dict(**param_grid_serializer)
    min_time = 100000000000
    for item in param_grid_iter_srlzr:
        if spark:
            spark.stop()
        spark = start_spark_session(application_name = 'pi pu pa',                   type_serializer=item['spark.serializer'],
                            kryo_buffer_max=item['spark.kryoserializer.buffer.max'], kryo_buffer=item['spark.kryoserializer.buffer'])
        start_time = time.time()
        # добавить функцию для выполнения кода
        func()
        if (time.time() - start_time) <= min_time:
            min_time = (time.time() - start_time)
            logger.info('##########################################')
            logger.info(f'Оптимальные параметры! {min_time}')
            logger.info("Характеристики: type_serializer={}, kryo_buffer_max={}, kryo_buffer={}, остальные параметры дефолтные"
                               .format(item['spark.serializer'], item['spark.kryoserializer.buffer.max'],item['spark.kryoserializer.buffer']))
            logger.info('##########################################')
        else:
            logger.info(f'Неоптимальные параметры! {min_time}')

Context ready: <pyspark.sql.session.SparkSession object at 0x7f79c4133a90>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79c413d278>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f792c89b9e8>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79c413d7f0>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79c4133470>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f792c89bcc0>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79ade95b38>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79c405efd0>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79ade9b860>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79c412d080>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79ade9b128>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79ade9b5c0>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79c4133898>
Context ready: <pyspark.s

## Исследуем 'spark.executor.cores'

In [16]:
logger.info('\n'*1)
logger.info('spark.executor.cores')
logger.info('\n'*1)
start_calc_fnc = [start_calc_1
                  ,start_calc_2
                  ,start_calc_3]
min_time = 100000000000

for func in start_calc_fnc:
    logger.info(func.__name__)
    min_time = 100000000000
    for item in param_grid['spark.executor.cores']:
        if spark:
            spark.stop()
        spark = start_spark_session(application_name = 'pi pu pa', executor_cores=item)
        start_time = time.time()
        # добавить функцию для выполнения кода
        func()
        if (time.time() - start_time) <= min_time:
            min_time = (time.time() - start_time)
            logger.info('##########################################')
            logger.info(f'Оптимальные параметры! {min_time}')
            logger.info("Характеристики: executor_cores={}, остальные параметры дефолтные".format(item))
            logger.info('##########################################')
            
        else:
            logger.info(f'Неоптимальные параметры! {min_time}')

Context ready: <pyspark.sql.session.SparkSession object at 0x7f3613e59f28>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f3615954198>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f36b58d5630>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f36995a2278>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f36bafcf6a0>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f3699816b00>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f3699585eb8>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f36995a2710>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f36b58d58d0>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f36998169e8>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f3699816828>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f36998013c8>


## Исследуем 'spark.network.timeout'

In [17]:
logger.info('\n'*1)
logger.info('spark.network.timeout')
logger.info('\n'*1)
start_calc_fnc = [start_calc_1
                  ,start_calc_2
                  ,start_calc_3]
min_time = 100000000000

for func in start_calc_fnc:
    logger.info(func.__name__)
    min_time = 100000000000
    for item in param_grid['spark.network.timeout']:
        if spark:
            spark.stop()
        spark = start_spark_session(application_name = 'pi pu pa', network_timeout=item)
        start_time = time.time()
        # добавить функцию для выполнения кода
        func()
        if (time.time() - start_time) <= min_time:
            min_time = (time.time() - start_time)
            logger.info('##########################################')
            logger.info(f'Оптимальные параметры! {min_time}')
            logger.info("Характеристики: network_timeout={}, остальные параметры дефолтные".format(item))
            logger.info('##########################################')
            
        else:
            logger.info(f'Неоптимальные параметры! {min_time}')

Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addada58>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79ade1fda0>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addc1da0>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addb5898>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addce9e8>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79adda63c8>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addb5dd8>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addb5358>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addbf278>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79adddb5f8>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79adda6fd0>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79ade1f940>


## Исследуем 'spark.shuffle.file.buffer'

In [18]:
logger.info('\n'*1)
logger.info('spark.shuffle.file.buffer')
logger.info('\n'*1)
start_calc_fnc = [start_calc_1
                  ,start_calc_2
                  ,start_calc_3]
min_time = 100000000000


for func in start_calc_fnc:
    logger.info(func.__name__)
    min_time = 100000000000
    for item in param_grid['spark.shuffle.file.buffer']:
        if spark:
            spark.stop()
        spark = start_spark_session(application_name = 'pi pu pa', shuffle_file_buffer=item)
        start_time = time.time()
        # добавить функцию для выполнения кода
        func()
        if (time.time() - start_time) <= min_time:
            min_time = (time.time() - start_time)
            logger.info('##########################################')
            logger.info(f'Оптимальные параметры! {min_time}')
            logger.info("Характеристики: shuffle_file_buffer={}, остальные параметры дефолтные".format(item))
            logger.info('##########################################')
            
        else:
            logger.info(f'Неоптимальные параметры! {min_time}')

Context ready: <pyspark.sql.session.SparkSession object at 0x7f79adddbbe0>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79adda6a20>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79adda6c18>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79ade6a518>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addfceb8>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addc1048>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79ade1f7b8>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addf3400>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addfc9b0>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addbff28>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addadef0>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addcea58>


## Исследуем 'spark.memory.fraction' и 'spark.memory.storageFraction'

In [19]:
logger.info('\n'*1)
logger.info( 'spark.memory.fraction and spark.memory.storageFraction')
logger.info('\n'*1)
start_calc_fnc = [start_calc_1
                  ,start_calc_2
                  ,start_calc_3]
min_time = 100000000000

param_grid_fraction = { 'spark.memory.fraction'           : [0.6, 0.7, 0.8, 0.9]
                       ,'spark.memory.storageFraction'    : [0.5, 0.6, 0.7, 0.8]
                      }
param_grid_iter_fraction = iterator_dict(**param_grid_fraction)

for func in start_calc_fnc:
    logger.info(func.__name__)
    min_time = 100000000000
    param_grid_iter_fraction = iterator_dict(**param_grid_fraction)
    for item in param_grid_iter_fraction:
        if spark:
            spark.stop()
        spark = start_spark_session(application_name = 'pi pu pa', memory_fraction=item['spark.memory.fraction'], memory_storageFraction=item['spark.memory.storageFraction'])
        start_time = time.time()
        # добавить функцию для выполнения кода
        func()
        if (time.time() - start_time) <= min_time:
            min_time = (time.time() - start_time)
            logger.info('##########################################')
            logger.info(f'Оптимальные параметры! {min_time}')
            logger.info("Характеристики: memory_fraction={}, memory_storageFraction={}, остальные параметры дефолтные".format(item['spark.memory.fraction'], item['spark.memory.storageFraction']))
            logger.info('##########################################')
            
        else:
            logger.info(f'Неоптимальные параметры! {min_time}')

Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addc1d68>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addee940>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79ade1ff98>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79ade2e9b0>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79ade61c88>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79ade2ef98>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addc1f60>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addc1828>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f792c89ae10>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addc1320>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79addb5048>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79ade1f9b0>
Context ready: <pyspark.sql.session.SparkSession object at 0x7f79adddb160>
Context ready: <pyspark.s

## Функции для исследования

In [11]:
def start_calc_1():
    spark.sql("""""").limit(2)

In [12]:
def start_calc_2():
    spark.sql("""""").limit(2) #.write.format('parquet').mode('overwrite').saveAsTable('spark_optim_trans')

In [13]:
def start_calc_3():
    spark.sql("""
""").limit(2) #.write.format('parquet').mode('overwrite').saveAsTable('spark_optim_fns')

In [4]:
import requests
### Снять поток (wf_id) с расписания
def delete_sched(CTL_URL, wf_id):
    try:
        print("delete sched (wf_id: {})".format(wf_id))
        headers = {'Content-Type':'application/json'}
        wf_id = int(wf_id)
        response = requests.delete("{0}/v1/api/wf/sched/{1}".format(CTL_URL,wf_id), headers = headers)
        print(response.status_code)
    except:
        print("ERROR: Unable to delete sched!")
        raise

### Вернуть поток(wf_id) на расписание
def put_sched(CTL_URL, wf_id):
    try:
        print("put sched (wf_id: {})".format(wf_id))
        headers = {'Content-Type':'application/json'}
        wf_id = int(wf_id)
        response = requests.put("{0}/v1/api/wf/sched/{1}".format(CTL_URL,wf_id), headers = headers)
        print(response.status_code)
    except:
        print("ERROR: Unable to delete sched!")
        raise


# Расчет с помощью запуска потоков (автоматическое тестирование)

In [44]:
# Модуль обучения (передаем номер потока, который начал дольше считаться) CHECK_SPARK_PARAMS (SPARMS):
# запустить каждый модуль потока отдельно, вырубив остальные модули и чекаться на этом

# 1) поискать в потоке все модули по параметрам потока, ориентируясь на custom_param
# 2) в цикле пройтись и запустить поток с одним включенным модулем и чекать время выполнения
# 3) сохранить результаты в таблице:
#       Номер потока; название моделя; текущая дата расчета; параметры
import requests
import json
from re import search
import re
import time
import datetime

wf_id = 11111
CTL_URL = ''
df = spark.table('spark_params')

# получить параметры потока
def get_param_wf_id(CTL_URL, wf_id):
    try:
        headers = {'Content-Type':'application/json'}
        response = requests.get("{0}/v1/api/params/wf/{1}".format(CTL_URL, int(wf_id)), headers = headers)
        print(response.status_code)
        json_dt = response.json()
    except:
        print("ERROR: wf_id does not exist")
        raise
    return json_dt

# запустить поток 
def run_wf_with_param(CTL_URL, wf_id, load_param):
    try:
        headers = {'Content-Type':'application/json'}
        #data_param = json.dumps(load_param)
        #data_param_cr = re.sub("\"","\\\"",data_param)
        #data_param_cree = "".join(data_param_cr.split())
        response = requests.post("{0}/v1/api/wf/sched/{1}".format(CTL_URL, wf_id), headers = headers, json = load_param)
        print(response.status_code)
        json_dt = response.json()
    except:
        print("ERROR: Unable to run wf!")
        raise
    return json_dt

# получить статус загрузки
def get_status_loading_id(CTL_URL, loading_id):
    try:
        headers = {'Content-Type':'application/json'}
        response = requests.get("{0}/v1/api/loading/{1}".format(CTL_URL, loading_id), headers = headers)
        print(response.status_code)
        json_dt = response.json()
    except:
        print("ERROR: Unable to loading!")
        raise
    return json_dt


# завершить загрузку
def complete_loading(CTL_URL, loading_id):
    try:
        print("delete loading (ctl_loading: {})".format(loading_id))
        headers = {'Content-Type':'application/json'}
        response = requests.delete("{0}/v1/api/loading/{1}".format(CTL_URL, int(loading_id)), headers = headers)
        print(response.status_code)
    except:
        print("ERROR: Unable to delete loading!")
        raise


custom_param = {'CUSTOM_PARAM':'N/A'}   # dict параметров custom_param_dc у потока
load_param = {}                         # dict параметров load_dc у потока

df_json = get_param_wf_id(CTL_URL, wf_id)
for i in df_json:
    if search('CUSTOM_PARAM',i['param']):           # добавить флаг джава или питон трансформации
        custom_param[i['param']]=i['prior_value']
        #print(i['param'],i['prior_value'])ы
    elif search('LOAD',i['param']) and i['prior_value'] != 'N' and i['param'] not in ('LOAD_TRANSFER_FROM_SNP_TO_HIST_ON','LOAD_DC_DM_CLEAN_ON'):
        load_param[i['param']]=i['prior_value']
        #print(i['param'],i['prior_value'])

moduls_cl_param = {} # dict соответствия включенных модулей и параметров
for c_key in custom_param:
    for l_key in load_param:
        if str(c_key)[13:] == str(l_key)[5:-3]:
            print(l_key, c_key)
            moduls_cl_param[l_key] = c_key   # соответствие включенных модулей и параметров

# добавить запуск потока с включенным одним модулем, чтобы чекать время для него в общий алгос
# затем изменить параметр custom_param_dc у потока после нахождения минимума!!!
param_grid = {# 'spark.serializer'                : ['org.apache.spark.serializer.KryoSerializer', 'org.apache.spark.serializer.JavaSerializer']    # выбрать одно значение для всех случаев
              #,'spark.kryoserializer.buffer.max' : [32, 64, 128, 256]      # выбрать одно значение для всех случаев
              #,'spark.kryoserializer.buffer'     : [32, 64, 128, 256]      # выбрать одно значение для всех случаев
               'spark.executor.cores'            : [4, 5]            # выбрать одно значение для всех случаев
              #,'spark.network.timeout'           : [120, 500, 1000, 3000]  # выбрать одно значение для всех случаев
              ,'spark.network.timeout'           : [1000]  # выбрать одно значение для всех случаев
              ,'spark.driver.memory'             : [2, 4, 8, 16]
              ,'spark.executor.memory'           : [8, 16, 24, 32]
              #,'spark.executor.memoryOverhead'   : [1, 2, 4, 8]
              #,'spark.memory.fraction'           : [0.6, 0.7, 0.8, 0.9]
              #,'spark.memory.storageFraction'    : [0.5, 0.6, 0.7, 0.8]
              #,'spark.shuffle.file.buffer'       : [64, 128, 256, 512]
              #,'spark.shuffle.manager'           : ['sort']
              #,'spark.shuffle.compress'          : [True]
              #,'spark.shuffle.spill.compress'    : [True]
              }

param_grid_iter = iterator_dict(**param_grid)
# самое минимальное и оптимальное время выполнения
min_time = float(100000000.0)
# самое минимальные и оптимальные параметры для выполнения
#min_params = {}
new_params = []
for key_load in load_param:
    load_param_copy = load_param.copy()
    for key_copy in load_param_copy:
        load_param_copy.update({key_copy: 'N'})
    param_grid_iter = iterator_dict(**param_grid)
    min_time = float(100000000.0)
    for item in param_grid_iter:
        #if spark:
        #    spark.stop()
        #spark = start_spark_session( #type_serializer=item['spark.serializer'],
        #                    #kryo_buffer_max=item['spark.kryoserializer.buffer.max'], kryo_buffer  =  item['spark.kryoserializer.buffer'], 
        #                    executor_cores =item['spark.executor.cores'],            driver_memory = item['spark.driver.memory'], 
        #                    executor_memory=item['spark.executor.memory'],           memory_overhead=item['spark.executor.memoryOverhead'],
        #                    #memory_fraction=item['spark.memory.fraction'],           memory_storageFraction=item['spark.memory.storageFraction'], 
        #                    #shuffle_file_buffer=item['spark.shuffle.file.buffer'],   
        #                    network_timeout=item['spark.network.timeout'])
        #print(spark)
        load_param_copy[key_load] = 'Y'

        #print(load_param_copy)
        custom_param_for_this_load = moduls_cl_param[key_load]
        #print(custom_param_for_this_load)
        custom_param_key = custom_param[custom_param_for_this_load]
        logger.info(custom_param_key)
        custom_param_key_list = ["".join(x.rstrip()) for x in custom_param_key.split('--') if x != '']
        #print(custom_param_key_list)
        params = {'spark.serializer'                                 :'org.apache.spark.serializer.KryoSerializer',
                  'spark.kryoserializer.buffer.max'                  :'32m',
                  'spark.kryoserializer.buffer'                      :'32m',
                  'spark.memory.fraction'                            :'0.6',
                  'spark.memory.storageFraction'                     :'0.5',
                  'spark.shuffle.file.buffer'                        :'128',
                  'spark.shuffle.manager'                            :'sort',
                  'spark.shuffle.compress'                           :'true',
                  'spark.shuffle.spill.compress'                     :'true',
                  'spark.shuffle.service.enabled'                    :'true',
                  'spark.dynamicAllocation.enabled'                  :'true',
                  'spark.dynamicAllocation.executorIdleTimeout'      :'120s',
                  'spark.dynamicAllocation.cachedExecutorIdleTimeout':'600s',
                  'spark.dynamicAllocation.initialExecutors'         :'0',
                  'spark.dynamicAllocation.minExecutors'             :'0',
                  'spark.dynamicAllocation.maxExecutors'             :'300'
                }
        
        for i in custom_param_key_list:
            if 'num-executors' in i or 'executor-memory' in i or 'executor-cores' in i or 'driver-memory' in i:
                key, value = i.split(" ")
                if key == 'num-executors':
                    continue
                elif key == 'executor-memory':
                    params.update({'spark.executor.memory': value})
                elif key == 'executor-cores':
                    params.update({'spark.executor.cores': value})
                elif key == 'driver-memory':
                    params.update({'spark.driver.memory': value})
            else:
                key, value = i.replace('conf ', '').split("=")
                params.update({key: value})
        params.update(item)
        params.update({'spark.executor.memoryOverhead': round(params.get('spark.executor.memory')*1024*0.1)})  # 1024*0.1 (переводим гегабайты в мегабайты а затем берем 10%)
        
        custom_param_final_str = "--conf spark.serializer={} --conf spark.kryoserializer.buffer.max={} --conf spark.kryoserializer.buffer={} --conf spark.executor.cores={} --conf spark.driver.memory={}g \
--conf spark.executor.memory={}g --conf spark.executor.memoryOverhead={}m --conf spark.memory.fraction={} --conf spark.memory.storageFraction={} --conf spark.shuffle.file.buffer={} \
--conf spark.network.timeout={}s".format(params['spark.serializer'],              params['spark.kryoserializer.buffer.max'], params['spark.kryoserializer.buffer'], 
                                         params['spark.executor.cores'],          params['spark.driver.memory'],             params['spark.executor.memory'], 
                                         params['spark.executor.memoryOverhead'], params['spark.memory.fraction'],           params['spark.memory.storageFraction'], 
                                         params['spark.shuffle.file.buffer'],     params['spark.network.timeout'])
        
        params_all_for_wf = load_param_copy.copy()
        params_all_for_wf.update({custom_param_for_this_load: custom_param_final_str})
        logger.info(params_all_for_wf)
        # break
        
        # брать время с параметров загрузки по курлу
        # start_time = time.time()
        
        loading_id = run_wf_with_param(CTL_URL, wf_id, params_all_for_wf)[1]      # запуск потока с включенным одним модулем
        status_load = get_status_loading_id(CTL_URL, loading_id)
        while (status_load['alive']=='ACTIVE' and (status_load['status'] in ("INIT","START","TIME-WAIT","EVENT-WAIT","LOCK-WAIT","PREREQ","LOCK","PARAM","RUNNING"))):
            time.sleep(300)
            status_load = get_status_loading_id(CTL_URL, loading_id)
        if status_load['alive']=='ABORTED':
            raise Exception('Error params') # идем к следующему расчету
        
        elif (status_load['alive']=='ACTIVE' and status_load['status']=='ERROR') or (status_load['alive']=='ACTIVE' and status_load['status']=='ERRORCHECK'):
            complete_loading(CTL_URL, loading_id)
            raise Exception('Error params') # идем к следующему расчету
        
        elif status_load['alive']=='COMPLETED' or (status_load['alive']=='ACTIVE' and status_load['status']=='SUCCESS'):
            start_dttm = status_load['start_dttm']
            end_dttm = status_load['end_dttm']
            start_dt = datetime.datetime.strptime(start_dttm[:19], '%Y-%m-%d %H:%M:%S')
            end_dt = datetime.datetime.strptime(end_dttm[:19], '%Y-%m-%d %H:%M:%S')
        logger.info(f'START date, {start_dt}')
        logger.info(f'END date, {end_dt}')
        ############# сохранить резы в табличку #############
        
        if float((end_dt - start_dt).total_seconds()) < min_time:
            min_time = float((end_dt - start_dt).total_seconds())
            workflow_name = status_load['workflow']['name']
            gregor_dt = str(datetime.datetime.now().date())
            spark_params_old = custom_param_key
            spark_params_new = json.dumps(params)
            ctl_datechange = str(datetime.datetime.now())
            new_params = [[workflow_name,gregor_dt,min_time,spark_params_old,spark_params_new,loading_id,ctl_datechange,wf_id,key_load]]
            logger.info('##########################################')
            logger.info(f'Оптимальные параметры! {min_time}')
            logger.info(f'Характеристики для потока {wf_id} {workflow_name} и модуля {key_load} за минимальное время {min_time}:\n {key_load}')
            logger.info('##########################################')
        else:
            logger.info(f'Неоптимальные параметры! {min_time}')
        logger.info('%s секунд' % min_time)
    df_new_rows = spark.createDataFrame(new_params, df.columns)
    df_new_rows.write.partitionBy('workflow_id','model_name').format('hive').mode('append').saveAsTable('spark_params')

200
LOAD_DC_CDM_RISK_MMB_SCORE_RBL_HIST_ON CUSTOM_PARAM_DC_CDM_RISK_MMB_SCORE_RBL_HIST
LOAD_DC_COMPACTOR_ON CUSTOM_PARAM_DC_COMPACTOR
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
200
20

In [1]:
df = spark.table('spark_params')
df.show(10,0)

#df_new_rows = spark.createDataFrame(new_params, df.columns)
#df_new_rows.write.partitionBy('workflow_id','model_name').format('hive').mode('append').saveAsTable('spark_params')

NameError: name 'spark' is not defined

In [None]:
spark.sql("""CREATE TABLE IF NOT EXISTS spark_params     (
                                               workflow_name STRING COMMENT ''
                                              ,gregor_dt TIMESTAMP COMMENT 'дата расчета'
                                              ,min_times BIGINT COMMENT ''
                                              ,spark_params_old STRING COMMENT ''
                                              ,spark_params_new STRING COMMENT ''
                                              ,ctl_loading BIGINT COMMENT ''
                                              ,ctl_datechange TIMESTAMP COMMENT 'дата расчета'
                                      )
COMMENT 'таблица технического мониторинга'
PARTITIONED BY (    workflow_id BIGINT COMMENT ''
                   ,model_name STRING COMMENT ''
               )
STORED AS PARQUET
TBLPROPERTIES ('transactional'='false')
""").show(10,0)



# Расчет с помощью отдельных функций (РУЧНОЕ ТЕСТИРОВАНИЕ)

In [10]:
# Модуль обучения (передаем номер потока, который начал дольше считаться) CHECK_SPARK_PARAMS (SPARMS):
# запустить каждый модуль потока отдельно, вырубив остальные модули и чекаться на этом

# 1) поискать в потоке все модули по параметрам потока, ориентируясь на custom_param
# 2) в цикле пройтись и запустить поток с одним включенным модулем и чекать время выполнения
# 3) сохранить результаты в таблице:
#       Номер потока; название моделя; текущая дата расчета; параметры

param_grid = { 'spark.serializer'                : ['org.apache.spark.serializer.KryoSerializer', 'org.apache.spark.serializer.JavaSerializer']    # выбрать одно значение для всех случаев
              ,'spark.kryoserializer.buffer.max' : [32, 64, 128, 256]      # выбрать одно значение для всех случаев
              ,'spark.kryoserializer.buffer'     : [32, 64, 128, 256]      # выбрать одно значение для всех случаев
              ,'spark.executor.cores'            : [2, 3, 4, 5]            # выбрать одно значение для всех случаев
              ,'spark.network.timeout'           : [120, 500, 1000, 3000]  # выбрать одно значение для всех случаев
              ,'spark.driver.memory'             : [2, 4, 8, 16]
              ,'spark.executor.memory'           : [8, 16, 24, 32]
              ,'spark.executor.memoryOverhead'   : [1, 2, 4, 8]
              ,'spark.memory.fraction'           : [0.6, 0.7, 0.8, 0.9]
              ,'spark.memory.storageFraction'    : [0.5, 0.6, 0.7, 0.8]
              ,'spark.shuffle.file.buffer'       : [64, 128, 256, 512]
              #,'spark.shuffle.manager'           : ['sort']
              #,'spark.shuffle.compress'          : [True]
              #,'spark.shuffle.spill.compress'    : [True]
              }

#param_grid_iter = iterator_dict(**param_grid)
# самое минимальное и оптимальное время выполнения
min_time = 100000000
min_params = {} # самое минимальные и оптимальные параметры для выполнения
start_calc_fnc = [start_calc_1
                 ,start_calc_2
                 ,start_calc_3]  # массив функций для вычислений

for func in start_calc_fnc:
    logger.info(func.__name__)
    param_grid_iter = iterator_dict(**param_grid)
    min_time = 100000000
    for item in param_grid_iter:
        if spark:
            spark.stop()
        spark = start_spark_session(application_name = 'pi pu pa',                   type_serializer=item['spark.serializer'],
                            kryo_buffer_max=item['spark.kryoserializer.buffer.max'], kryo_buffer  =  item['spark.kryoserializer.buffer'], 
                            executor_cores =item['spark.executor.cores'],            driver_memory = item['spark.driver.memory'], 
                            executor_memory=item['spark.executor.memory'],           memory_overhead=item['spark.executor.memoryOverhead'],
                            memory_fraction=item['spark.memory.fraction'],           memory_storageFraction=item['spark.memory.storageFraction'], 
                            shuffle_file_buffer=item['spark.shuffle.file.buffer'],   network_timeout=item['spark.network.timeout'])
        print(spark)
        start_time = time.time()
        
        func() # добавить функцию для запуска потока с определенным модулем
        
        if (time.time() - start_time) <= min_time:
            min_time = (time.time() - start_time)
            logger.info('##########################################')
            logger.info(f'Оптимальные параметры! {min_time}')
            logger.info("Характеристики: memory_fraction={}, memory_storageFraction={}, остальные параметры дефолтные".format(item['spark.memory.fraction'], item['spark.memory.storageFraction']))
            logger.info('##########################################')
            
        else:
            logger.info(f'Неоптимальные параметры! {min_time}')

Context ready: <pyspark.sql.session.SparkSession object at 0x7f8860ede630>
<pyspark.sql.session.SparkSession object at 0x7f8860ede630>
+------+-------+-------+--------+----------------+-----------+--------------+-----------------+------------------+----------------+--------+-----------------+----------------+-----------+-----------------+----------------+----------------+----------+--------+---+-------+----------------+-----------+--------------+--------+---------------+--------------+------------------+----------+----------+---------------+----------------+----------------------+-----------------------+-------+------------+---------------------+----------------+---+-----------+---+--------------------+---------------------------+----+----+----+---+---------+-------------------+------------+---------------+-----------------+------------------------+---------------------+-----+-----+-----+------+------+------+------+------+------+---+---+------------------+----------------+-------------

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/sdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1152, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/rh/rh-python36/root/lib64/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
  File "/usr/sdp/current/spark2-client/python/pyspark/context.py", line 269, in signal_handler
    self.cancelAllJobs()
  File "/usr/sdp/current/spark2-client/python/pyspark/context.py", line 1039, in cancelAllJobs
    self._jsc.sc().cancelAllJobs()
AttributeError: 'NoneType' object has no attribute 'sc'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/sdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/usr/sdp/current/spark2-client/python/lib/

Py4JError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext

In [None]:
# Модуль поиска неоптимальных расчетов CHECK_WF_TIME (WFT)
# 1) хранить в таблице по номерам потока историю расчета - сколько времени занимал расчет потока (можно ли получить историю сколько времени занимал расчет каждого модуля)?
# Таблица: 
#      номер потока; текущая дата чека; время начала расчета; время окончания расчета; продолжительность
# 2) если потратилось больше времени, чем среднее время расчета за весь период с отклоенением 10%, то запустить перерасчет параметров
# 3) получить значения новых параметров после перерасчета и изменить их для каждого модуля у потока, оповестив об этом в почте

In [11]:
spark.stop()