**Batch processing in Prefect**   
Writing into Redshift has a byte limit that is reached after ~2-3 days of heavy usage by 1 study.  From experience. So we're running > 1 day of preprocessing (i.e. catchup jobs) by setting off a bunch of flows that last only 2-3 days at a time.   
https://discourse.prefect.io/t/how-to-map-over-flows-with-various-parameter-values/365

In [1]:
from datetime import date, datetime, timedelta, time, timezone
import pendulum
import pandas as pd

Forr Integrations in Prefect
* using pendulum?
* get a list of dates between 2 dates - using pandas

In [3]:
# convert pendulum item back to text
end = '2022-03-13'
timezone = "UTC"
end_range=pendulum.parse(end, tz=timezone)
end_string = end_range.strftime('%Y-%m-%d %H:%M')

print(end_string)

2022-03-13 00:00


In [4]:
# FOR CATCHUP JOBS > 2 DAYS
def bulk_time_params(start, end, delta, tz_input="UTC"):
    '''start: datetime in format YYYY-MM-DD HH:mm:ss
       delta: the number of days each integration will cover.'''
    timezone = pendulum.timezone(tz_input)
    # get period btwn dates?
    start_range=pendulum.parse(start, tz=timezone) 
    end_range=pendulum.parse(end, tz=timezone)
    
    # Chunk out flows by X days
    starts = pd.date_range(start_range,end_range,freq=f'{delta}d').strftime('%Y-%m-%d %H:%M').to_list()
    if end_range.strftime('%Y-%m-%d %H:%M') in starts: starts.remove(end_range.strftime('%Y-%m-%d %H:%M')) 
    ends = starts[1:] #take out first "start" date, make the others the end dates for seamless integration
    ends.append(end_range.strftime('%Y-%m-%d %H:%M')) #add the end date
    print(f"There are {len(starts)} start and {len(ends)} end datetimes in this batch integration.")
    return (starts, ends)
    
    #end_range-timedelta(days=delta)

In [5]:
start = '2021-11-07'
end = '2022-03-14'
timezone = "UTC"

params = bulk_time_params(start,end, 3, timezone)

params[0]

There are 43 start and 43 end datetimes in this batch integration.


['2021-11-07 00:00',
 '2021-11-10 00:00',
 '2021-11-13 00:00',
 '2021-11-16 00:00',
 '2021-11-19 00:00',
 '2021-11-22 00:00',
 '2021-11-25 00:00',
 '2021-11-28 00:00',
 '2021-12-01 00:00',
 '2021-12-04 00:00',
 '2021-12-07 00:00',
 '2021-12-10 00:00',
 '2021-12-13 00:00',
 '2021-12-16 00:00',
 '2021-12-19 00:00',
 '2021-12-22 00:00',
 '2021-12-25 00:00',
 '2021-12-28 00:00',
 '2021-12-31 00:00',
 '2022-01-03 00:00',
 '2022-01-06 00:00',
 '2022-01-09 00:00',
 '2022-01-12 00:00',
 '2022-01-15 00:00',
 '2022-01-18 00:00',
 '2022-01-21 00:00',
 '2022-01-24 00:00',
 '2022-01-27 00:00',
 '2022-01-30 00:00',
 '2022-02-02 00:00',
 '2022-02-05 00:00',
 '2022-02-08 00:00',
 '2022-02-11 00:00',
 '2022-02-14 00:00',
 '2022-02-17 00:00',
 '2022-02-20 00:00',
 '2022-02-23 00:00',
 '2022-02-26 00:00',
 '2022-03-01 00:00',
 '2022-03-04 00:00',
 '2022-03-07 00:00',
 '2022-03-10 00:00',
 '2022-03-13 00:00']

Parameters argument on the create_flow_run task expects a dictionary in the format {“param_name”: param_value} 

In [12]:
from prefect.tasks.secrets import PrefectSecret
from prefect import task, Flow, Parameter, context

In [17]:
studies = "000c0000-0000-0000-8000-000000000000"
timezone = "UTC"
starts = params[0]
ends = params[1]

all_bulk_params = []
for i in range(len(starts)):
    single_flow_params = {"startdatetime": starts[i],
                        "enddatetime": ends[i],
                        "participants": [],
                        "studies": [studies],
                        "timezone": timezone,
                        "export_format": "",
                        "filepath": "",
                        "filename": "",
                        "dbuser": "datascience",
                        "hostname": "chronicle.cey7u7ve7tps.us-west-2.redshift.amazonaws.com",
                        "port": 5439,
                        "password": PrefectSecret("dbpassword")}
    all_bulk_params.append(single_flow_params)


  my_task = Parameter(...)  # static (non-Task) args go here
  res = my_task(...)  # dynamic (Task) args go here

see https://docs.prefect.io/core/concepts/flows.html#apis for more info.


In [18]:
all_bulk_params

[{'startdatetime': '2021-11-07 00:00',
  'enddatetime': '2021-11-10 00:00',
  'participants': [],
  'studies': ['000c0000-0000-0000-8000-000000000000'],
  'timezone': 'UTC',
  'export_format': '',
  'filepath': '',
  'filename': '',
  'dbuser': 'datascience',
  'hostname': 'chronicle.cey7u7ve7tps.us-west-2.redshift.amazonaws.com',
  'port': 5439,
  'password': <Parameter: password>},
 {'startdatetime': '2021-11-10 00:00',
  'enddatetime': '2021-11-13 00:00',
  'participants': [],
  'studies': ['000c0000-0000-0000-8000-000000000000'],
  'timezone': 'UTC',
  'export_format': '',
  'filepath': '',
  'filename': '',
  'dbuser': 'datascience',
  'hostname': 'chronicle.cey7u7ve7tps.us-west-2.redshift.amazonaws.com',
  'port': 5439,
  'password': <Parameter: password>},
 {'startdatetime': '2021-11-13 00:00',
  'enddatetime': '2021-11-16 00:00',
  'participants': [],
  'studies': ['000c0000-0000-0000-8000-000000000000'],
  'timezone': 'UTC',
  'export_format': '',
  'filepath': '',
  'filename

This is how we will set off the mapped flows (not run here)

In [None]:
from prefect import Flow, unmapped, task
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.triggers import all_finished

mapped_flows = create_flow_run.map(
    parameters=all_bulk_params,
    flow_name=unmapped("preprocessing_daily"),
    project_name=unmapped("Preprocessing"),
)

In [None]:
\

