In [2]:
from azureml.core import Experiment
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.datastore import Datastore
from azureml.core.runconfig import CondaDependencies, RunConfiguration
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.core import Workspace, Experiment
from azureml.pipeline.core import OutputPortBinding
import pandas as pd
import azureml
from loader import reader_csv
import io

In [3]:
print("SDK version:", azureml.core.VERSION)

SDK version: 1.19.0


In [4]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication(tenant_id="3b920f4d-d9b6-41a0-927e-89d6664219bc")

ws = Workspace(subscription_id="4ca4328f-9db7-4cbc-ae4b-380b5a441a58",
               resource_group="mlservice",
               workspace_name="mlservice",
               auth=interactive_auth)

# Cluster's list on the subscriptuion

In [7]:
cts = ws.compute_targets
for ct in cts:
    print(ct)

spar


# Get a list of objects for prediction

In [8]:
switch_on = reader_csv('spardata', r'switch_on/stores.csv' )
stores = switch_on['ObjCode'].to_list()

# Deleting unnecessary pipelines

In [10]:
from azureml.pipeline.core import PublishedPipeline
published_pipeline = PublishedPipeline.get(ws, '{}'.format('bd4c6391-6b73-40ba-87dd-e3ba5ed9a638'))
published_pipeline.disable()

# Pipeline creation

In [12]:
compute_target = AmlCompute(ws, "spar")

conda_dependencies = CondaDependencies.create(
    pip_packages=["boto3==1.21.15", "pandas==1.4.1", "catboost==1.2",  "pyjwt==1.7.1", "xlrd", 'requests'], python_version="3.11"
) # "catboost",
run_config = RunConfiguration(conda_dependencies=conda_dependencies)
run_config.environment.docker.enabled = True
run_config.source_directory_data_store = "workspaceblobstore"



steps = []


score_preparing = PythonScriptStep(allow_reuse=False,
    name="preparing",
    script_name="preparing.py",
    compute_target=compute_target,
    runconfig=run_config,
    source_directory = "preparing")

score_temp_promo_deleting = PythonScriptStep(allow_reuse=False,
    name="promo_deleting",
    script_name="temp_promo_deleting.py",
    compute_target=compute_target,
    runconfig=run_config,
    source_directory = "temp_promo_deleting")


for stores in stores_list:
    
    score_step = PythonScriptStep(allow_reuse=False,
        name=f"prediction_scoring_{stores}",
        script_name="prediction1.py",
        arguments=[ "--stores", "{}".format(stores)],
        compute_target=compute_target,
        runconfig=run_config,
        source_directory = "prediction")
    

    
    score_step1 = PythonScriptStep(allow_reuse=False,
        name="discount_scoring",
        script_name="discount_updating.py",
        arguments=[ "--stores", "{}".format(stores)],
        compute_target=compute_target,
        runconfig=run_config,
        source_directory = "discount_updating")
    
    score_step1.run_after(score_preparing)
    score_step.run_after(score_step1)
    score_temp_promo_deleting.run_after(score_step)

steps.append(score_temp_promo_deleting)

In [13]:
pipeline = Pipeline(workspace=ws, steps=steps)

In [14]:
len(steps)

1

# Публикация пайплайна

In [15]:
pipeline_name = "pipeline_25_09_2023"
print(pipeline_name)

published_pipeline = pipeline.publish(
    name=pipeline_name,
    description=pipeline_name)
print("Newly published pipeline id: {}".format(published_pipeline.id))

pipeline_25_09_2023
Created step promo_deleting [84780082][cc97b1f7-ed78-4ddb-97b0-65bb659f3d9a], (This step will run and generate new outputs)
Created step prediction_scoring_1553 [7d413967][bbf26366-c781-4c55-b9ee-b4a6c913387b], (This step will run and generate new outputs)
Created step discount_scoring [83a70207][d45b71d5-b064-49b0-96ad-c9c8a5e0f76d], (This step will run and generate new outputs)
Created step preparing [2ba7577d][783691c4-c522-473f-92a4-17ebc26ed4b1], (This step will run and generate new outputs)
Created step prediction_scoring_1554 [49f38c77][687cc51a-60c4-4cc9-b83b-50b9c7eaae3e], (This step will run and generate new outputs)
Created step discount_scoring [c7f8f3d9][8aa96045-a2e5-4d3c-bf6d-f8b13ae3c688], (This step will run and generate new outputs)
Created step prediction_scoring_1567 [bcb509bd][618758f5-c645-4d23-9b7b-7df5a2bc6b6a], (This step will run and generate new outputs)
Created step discount_scoring [239241b7][25a9d008-334b-4510-b54b-cf947318df00], (This 

# Запуск пайплайна с ноутбука

In [16]:
from azureml.core.authentication import InteractiveLoginAuthentication
import requests

rest_endpoint = published_pipeline.endpoint
interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()

response = requests.post(rest_endpoint,
                         headers=auth_header,
                         json={"ExperimentName": "Prediction_pipeline"})
run_id = response.json()["Id"]
print(run_id)

41b8e8ed-aa4b-4c0c-b5fe-919d3dd918d6


# Поставить пайплайн на расписание

In [24]:
published_pipeline.id

'8c65babd-e502-4d17-a506-6fab4fdd6a07'

In [25]:
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

recurrence = ScheduleRecurrence(frequency="Week", interval=1, 
                                week_days = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"],
                                hours=[12], minutes=[10]) 


schedule = Schedule.create(workspace=ws, name="Forecasting_Schedule",
                           pipeline_id='{}'.format(published_pipeline.id), 
                           experiment_name="Prediction_pipeline",
                           recurrence=recurrence,
                           wait_for_provisioning=True,
                           description="Schedule Run")

# You may want to make sure that the schedule is provisioned properly
# before making any further changes to the schedule

print("Created schedule with id: {}".format(schedule.id))

Provisioning status: Completed
Created schedule with id: 85e5dc56-7d57-415c-8b14-4cb63887a89f


In [20]:
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

recurrence = ScheduleRecurrence(frequency="Week", interval=1, 
                                week_days = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"],
                                hours=[0], minutes=[10]) 


schedule = Schedule.create(workspace=ws, name="Forecasting_Schedule",
                           pipeline_id='02d69225-86a1-40fe-9d11-6699183702e0', 
                           experiment_name="Prediction_pipeline",
                           recurrence=recurrence,
                           wait_for_provisioning=True,
                           description="Schedule Run")

# You may want to make sure that the schedule is provisioned properly
# before making any further changes to the schedule

print("Created schedule with id: {}".format(schedule.id))

Provisioning status: Completed
Created schedule with id: 8377260a-2ec2-4239-9a81-085444285bf5


# Посмотреть активные расписания и отключить какое-то, если необходимо

In [5]:
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule
schedules = Schedule.list(ws, active_only=True)

In [7]:
# aee710f1-d0bb-4d19-9ed4-063d29b44b5d
schedules

[Pipeline(Name: Forecasting_Schedule,
 Id: 8377260a-2ec2-4239-9a81-085444285bf5,
 Status: Active,
 Pipeline Id: 02d69225-86a1-40fe-9d11-6699183702e0,
 Pipeline Endpoint Id: None,
 Recurrence Details: Runs at 0:10 on Monday, Tuesday, Wednesday, Thursday, Friday, Saturday every Week)]

In [8]:
fetched_schedule = Schedule.get(ws, '{}'.format('8377260a-2ec2-4239-9a81-085444285bf5'))
fetched_schedule.disable()

# Проверить обновление магазинов

In [57]:
%%time

from datetime import datetime, timedelta

blob_list = list(block_blob_service.list_blob_names('spardata', prefix = 'predictions/'))

blob_actual =  list(filter(lambda x: 'predictions/' in x, blob_list))
generator = block_blob_service.list_blobs(container_name)
prediction = []
for blob in blob_actual:
    lastModified= BlockBlobService.get_blob_properties(block_blob_service,'spardata',blob).properties.last_modified
    prediction.append([int(blob.split('/')[1]), lastModified + timedelta(hours = 3)])
    
prediction_df = pd.DataFrame(data = prediction, columns = ['store_id', 'last_updated_prediction'])

Wall time: 17.2 s


In [58]:
len(prediction_df[prediction_df['last_updated_prediction'] > '2022-09-05 15:00:00'].sort_values('last_updated_prediction', ascending = False))

35

In [59]:
prediction_df[(prediction_df['last_updated_prediction'] > '2022-09-05 15:00:00')]

Unnamed: 0,store_id,last_updated_prediction
50,1346,2022-09-05 16:48:03+00:00
52,1348,2022-09-05 16:52:13+00:00
58,1354,2022-09-05 17:02:32+00:00
59,1356,2022-09-05 16:55:00+00:00
64,1361,2022-09-05 16:59:08+00:00
68,1372,2022-09-05 16:54:46+00:00
72,1378,2022-09-05 16:59:01+00:00
77,1384,2022-09-05 16:59:19+00:00
82,1393,2022-09-05 16:50:15+00:00
85,1396,2022-09-05 16:52:15+00:00


In [38]:
prediction_df[prediction_df['last_updated_prediction'] > '2022-03-31 00:00:00'].sort_values('last_updated_prediction')

Unnamed: 0,store_id,last_updated_prediction
84,1395,2022-03-31 05:02:44+00:00
98,1417,2022-03-31 05:03:01+00:00
50,1346,2022-03-31 13:26:27+00:00
48,1338,2022-03-31 13:27:59+00:00
52,1348,2022-03-31 13:27:59+00:00
...,...,...
169,741,2022-03-31 14:42:51+00:00
128,1509,2022-03-31 14:44:17+00:00
171,764,2022-03-31 14:47:42+00:00
172,768,2022-03-31 14:52:43+00:00


In [40]:
prediction_df[prediction_df['store_id']==1418 ]

Unnamed: 0,store_id,last_updated_prediction
99,1418,2021-12-01 04:19:56+00:00


In [42]:
prediction_df[(prediction_df['last_updated_prediction'] > '2022-02-11 15:30:00')&
             (prediction_df['store_id'] == 1354)]

Unnamed: 0,store_id,last_updated_prediction
58,1354,2022-02-11 16:00:05+00:00


In [43]:
with io.BytesIO() as input_blob:
    block_blob_service.get_blob_to_stream(container_name='spardata',
                                          blob_name = f'predictions/{1399}/prediction.csv',
                                          stream = input_blob)
    input_blob.seek(0)
    prediction = pd.read_csv(input_blob)

In [42]:
prediction

Unnamed: 0,date,item_id,prediction
0,2022-03-16,100464321,0.000000
1,2022-03-17,100464321,0.000000
2,2022-03-18,100464321,0.000000
3,2022-03-19,100464321,0.000000
4,2022-03-20,100464321,0.000000
...,...,...,...
626131,2022-04-22,124225,21.289082
626132,2022-04-23,124225,22.553281
626133,2022-04-24,124225,19.097876
626134,2022-04-25,124225,17.623722


In [44]:
prediction

Unnamed: 0,date,item_id,prediction
0,2022-03-16,100589570,0.0
1,2022-03-17,100589570,0.0
2,2022-03-18,100589570,0.0
3,2022-03-19,100589570,0.0
4,2022-03-20,100589570,0.0
...,...,...,...
166357,2022-04-22,101195855,0.0
166358,2022-04-23,101195855,0.0
166359,2022-04-24,101195855,0.0
166360,2022-04-25,101195855,0.0


In [45]:
prediction[prediction['prediction'] > 0]

Unnamed: 0,date,item_id,prediction
504,2022-03-16,147058,5.000000
505,2022-03-17,147058,4.400000
506,2022-03-18,147058,2.200000
507,2022-03-19,147058,6.000000
508,2022-03-20,147058,3.000000
...,...,...,...
166227,2022-04-18,103355,1.011674
166228,2022-04-19,103355,1.913150
166232,2022-04-23,103355,0.962875
166234,2022-04-25,103355,1.020614
