In [1]:
import boto3
import pandas as pd
import os

In [2]:
# YandexCloud init

ya_service_name = 's3'
ya_url = os.environ['ya_url']
ya_key_id = os.environ['ya_key_id']
ya_key = os.environ['ya_key']
container_name = os.environ['container_name']

session = boto3.session.Session()
s3 = session.client(
    service_name=ya_service_name,
    endpoint_url=ya_url,
    aws_access_key_id = ya_key_id,
    aws_secret_access_key = ya_key
    )


def reader_csv(container_name, key_name, is_header=True):
    '''
        Get data from storage for a single operation:
          container_name -- str, container name in storage
          key_name -- str, way to data in storage
          is_header -- bool, if TRUE header in data exist, FALSE header=None
        Return: DataFrame
    '''
    response = s3.get_object(Bucket=container_name, 
                             Key=key_name)
    if is_header:
        df = pd.read_csv(response.get("Body"))
    else:
        df = pd.read_csv(response.get("Body"), header=None)
    return df


# read all switch stores
switch_on = reader_csv('spardata', r'switch_on/stores.csv', is_header=True)  
stores = switch_on['ObjCode'].to_list()

In [4]:
# WF config
ram_need = 14
ram_up_limit = 18
nodes = 12 # VMs count for the task 

minutes = 0
hours = 0
month_day = '*'    # every
month = '*'        # every
day_of_week = '*'  # every

In [6]:
stores_list = []

for i in range(nodes):
    stores_list.append([]) 

for i in range(len(stores)):
    k = i%nodes
    stores_list[k].append(stores[i])

In [8]:
# templates
resources = f"""
                  resources:
                      requests:
                          memory: {ram_need}Gi
                      limits:
                          memory: {ram_up_limit}Gi"""

head = """apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
    name: cron-dag-spar-wf
spec:
    schedule: "{}"
    concurrencyPolicy: "Replace"
    startingDeadlineSeconds: 0
    workflowSpec:
        entrypoint: prediction-pipeline
        templates:
            - name: prediction-pipeline
              dag:
                tasks:

                - name: prep
                  template: python-preparing-script"""

dag_disc_update = """
                - name: disc-update{}
                  dependencies: [prep]
                  template: python-disc-updating-script{}"""

dag_pred = """
                - name: prediction{}
                  dependencies: [disc-update{}]
                  template: python-prediction-script{}"""

dag_promo_delete_temp = """
                - name: temp-promo-delete
                  dependencies: [{}]
                  template: python-temp-promo-del-script"""

prep_image_run = """
            - name: python-preparing-script          
              script:
                  image: cr.yandex/crp479pqesbl9eqd05c2/preparing:latest
                  command: [bash]
                  source: |
                      python preparing.py""" + resources
    
disc_update_image_run = """
            - name: python-disc-updating-script{}
              script:
                  image: cr.yandex/crp479pqesbl9eqd05c2/discount_updating:latest
                  command: [bash]
                  source: |
                      python discount_updating.py --stores '{}'""" + resources


pred_image_run = """
            - name: python-prediction-script{}
              script:
                  image: cr.yandex/crp479pqesbl9eqd05c2/prediction:latest
                  command: [bash]
                  source: |
                      python prediction1.py --stores '{}'""" + resources

promo_delete_image_run = """
            - name: python-temp-promo-del-script
              script:
                  image: cr.yandex/crp479pqesbl9eqd05c2/temp_promo_deleting:latest
                  command: [bash]
                  source: |
                      python temp_promo_deleting.py""" + resources

In [9]:
disc_images_run = []
pred_images_run = []
full_dag_disc_update = []
full_dag_prediction = []
all_pred_dags = []

for num_batch, batch_stores in enumerate(stores_list):
    full_dag_disc_update += [dag_disc_update.format(*[str(num_batch)]*2)]
    full_dag_prediction += [dag_pred.format(*[str(num_batch)]*3)]

    disc_images_run += [disc_update_image_run.format(num_batch, ','.join(list(map(str, batch_stores))))]
    pred_images_run += [pred_image_run.format(num_batch, ','.join(list(map(str, batch_stores))))]
    all_pred_dags += [f'prediction{num_batch}']
    
full_dag_disc_update = '\n'.join(full_dag_disc_update)
full_dag_prediction = '\n'.join(full_dag_prediction)

all_pred_dags = ','.join(all_pred_dags)
full_dag_promo_delete_temp = dag_promo_delete_temp.format(all_pred_dags)

disc_images_run = '\n'.join(disc_images_run)
pred_images_run = '\n'.join(pred_images_run)

dag_final = (head + '\n' + full_dag_disc_update + '\n' + full_dag_prediction + '\n' + full_dag_promo_delete_temp + '\n\n' + 
             prep_image_run + '\n' + disc_images_run + '\n' + pred_images_run + '\n' + promo_delete_image_run)

dag_final = (head.format(f'{minutes} {hours} {month_day} {month} {day_of_week}') + '\n' +
             full_dag_disc_update + '\n' + full_dag_prediction + '\n' + full_dag_promo_delete_temp + '\n\n' + 
             prep_image_run + '\n' + disc_images_run + '\n' + pred_images_run + '\n' + promo_delete_image_run)

f = open("cron_dag_final.yaml", "w")
f.write(dag_final)
f.close()