# Scheduled NEMS Data fetching to Database

In [None]:
import os
from os.path import join

ROOT = '/home/sdc/DR_DemandForecast/emcData'
DATADIR = '/home/sdc/DR_DemandForecast/emcData/data/'
CRONTAB = True

In [None]:
from dep import nemsData2 as nems
import pandas as pd
import urllib3
from sqlalchemy import create_engine, text
import os
from dotenv import load_dotenv

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

In [None]:
import datetime as dt
from datetime import timedelta as delta
import pytz
import time as t

## Fetch Data

### Methods

#### Corp Data

In [None]:
def corp_for_dpr(corp_df):
    corp_df = corp_df.copy()
    corp_df = corp_df[['Date', 'Period'] +
                      [col for col in corp_df.columns if col not in ['Date', 'Period', 'reportType', 'secondaryReserve']]]

    # Convert to int
    corp_df['Period'] = corp_df['Period'].astype(int)

    # Convert to float
    float_cols = ['Demand', 'TCL', 'USEP', 'LCP', 'Regulation', 'PrimaryReserve']
    corp_df[float_cols] = corp_df[float_cols].apply(pd.to_numeric, errors='coerce')

    # If 'secondaryReserve' and 'contingencyReserve' contain 'None', convert to float and keep NaN
    corp_df[['ContingencyReserve', 'EHEUR', 'Solar']] = corp_df[[
        'ContingencyReserve', 'EHEUR', 'Solar']].apply(pd.to_numeric, errors='coerce')
    
    return corp_df


def corp_for_lar(corp_df):
    corp_df = corp_df.copy()
    corp_df = corp_df[['Date', 'Period'] +
                      [col for col in corp_df.columns if col not in ['Date', 'Period', 'reportType', 'secondaryReserve', 'Demand', 'TCL', 'USEP', 'LCP', 'EHEUR', 'Solar']]]

    # Convert 'period' to int
    corp_df['Period'] = corp_df['Period'].astype(int)

    # Convert 'regulation', 'primaryReserve', 'contingencyReserve' to float
    float_cols = ['Regulation', 'PrimaryReserve', 'ContingencyReserve']
    corp_df[float_cols] = corp_df[float_cols].apply(
        pd.to_numeric, errors='coerce')

    return corp_df

#### MCR010 Data

In [None]:
def get_mcr010(mcr_serie):
    mcr010 = nems.getMCRReport('MCR010', mcr_serie.iloc[0])
    return mcr010

#### MCR012 Data

In [None]:
def get_mcr012(mcrSerie):
    mcr012_df = nems.getMCRReport('MCR012', mcrSerie.iloc[0])
    return mcr012_df

#### Network Traffic Control 

In [None]:
def wait_retry (func, targetType=pd.DataFrame):

    for i in range(10):
        try:
            data = func
            
            if not type(data) == targetType:
                print(data)
                raise Exception(f"Expected {targetType} but got {type(data)}")
            
            print('.', end='')
            t.sleep(10) # Sleep awhile if successfully fetched data
            return data
            
        except Exception as e:
            print(f"In API call, error message: {e}")
            t.sleep(30) # Sleep for a long time to wait for next try
            continue


In [None]:
def fetch_corp(
    date_today: dt.date, 
    need_tomorrow_data: bool, 
    date_tomorrow: dt.date,
):
    print(" Corp", end="")
    corp_df = wait_retry(nems.getCorp(date_today.strftime(format='%d-%b-%Y')))
    if not CRONTAB: 
        corp_df.to_csv(f"{DATADIR}Corp_{date_today.strftime(format='%Y-%m-%d')}.csv", index=False)
    corp_today = corp_for_dpr(corp_df)

    if need_tomorrow_data:
        corp_df = wait_retry(nems.getCorp(date_tomorrow.strftime(format='%d-%b-%Y')))
        if not CRONTAB: 
            corp_df.to_csv(f"{DATADIR}Corp_{date_tomorrow.strftime(format='%Y-%m-%d')}.csv", index=False)
        corp_tomorrow = corp_for_lar(corp_df)
    else:
        print("_", end="")
        corp_tomorrow = None
        
    return corp_today, corp_tomorrow

In [None]:
def fetch_mcr_dpr(
    date_today: dt.date, 
    period_now: int
): 
 
    print(" MCR001", end="")
    # DPR
    mcr_df = wait_retry(nems.getMCR001(date_today.strftime(format='%d-%b-%Y'), 'M'))
    if not CRONTAB:
        mcr_df.to_csv(f"{DATADIR}MCR001_DPR_{date_today.strftime(format='%d-%b-%Y')}_{period_now}.csv", index=False)
    mcr_serie = mcr_df[
        (mcr_df['FirstDate'] == date_today) & 
        (mcr_df['FirstPeriod'] == period_now)].copy()
    
    ''' Get MCR010 '''
    print(" MCR010", end="")
    # DPR
    mcr010_df = wait_retry(get_mcr010(mcr_serie))
    if not CRONTAB:
        mcr010_df.to_csv(f"{DATADIR}MCR010_DPR_{date_today.strftime(format='%d-%b-%Y')}_{period_now}.csv", index=False)
        
    ''' Get MCR012 '''
    print(" MCR012", end="")
    # DPR
    mcr012_df = wait_retry(get_mcr012(mcr_serie))
    if not CRONTAB: 
        mcr012_df.to_csv(f"{DATADIR}MCR012_DPR_{date_today.strftime(format='%d-%b-%Y')}_{period_now}.csv", index=False)
        
    return mcr010_df, mcr012_df

In [None]:
def fetch_mcr_lar(
    date_today: dt.date, 
    date_lar: dt.date, 
    period_lar: int
): 
    ''' MCR001 '''
    # H
    mcr_h_df = wait_retry(nems.getMCR001(date_lar.strftime(format='%d-%b-%Y'), 'H', runType='LAR'))
    if not CRONTAB:
        mcr_h_df.to_csv(
            f"{DATADIR}MCR001_LAR_H_{date_lar.strftime(format='%d-%b-%Y')}_{period_lar}.csv", index=False)
    mcr_h_serie = mcr_h_df[
        (mcr_h_df['FirstDate'] == date_lar) & 
        (mcr_h_df['FirstPeriod'] == period_lar)].copy()

    # M
    mcr_m_df = wait_retry(nems.getMCR001(date_lar.strftime(format='%d-%b-%Y'), 'M', runType='LAR'))
    if not CRONTAB:
        mcr_m_df.to_csv(
            f"{DATADIR}MCR001_LAR_M_{date_lar.strftime(format='%d-%b-%Y')}_{period_lar}.csv", index=False)
    mcr_m_serie = mcr_m_df[
        (mcr_m_df['FirstDate'] == date_lar) & 
        (mcr_m_df['FirstPeriod'] == period_lar)].copy()

    # L
    mcr_l_df = wait_retry(nems.getMCR001(date_lar.strftime(format='%d-%b-%Y'), 'L', runType='LAR'))
    if not CRONTAB:
        mcr_l_df.to_csv(
            f"{DATADIR}MCR001_LAR_L_{date_lar.strftime(format='%d-%b-%Y')}_{period_lar}.csv", index=False)
    mcr_l_serie = mcr_l_df[
        (mcr_l_df['FirstDate'] == date_lar) & 
        (mcr_l_df['FirstPeriod'] == period_lar)].copy()
    
    ''' MCR010 '''
    mcr010_h_df = wait_retry(get_mcr010(mcr_h_serie))
    if not CRONTAB:
        mcr010_h_df.to_csv(f"{DATADIR}MCR010_LAR_H_{date_today.strftime(format='%d-%b-%Y')}_{period_lar}.csv", index=False)
    mcr010_m_df = wait_retry(get_mcr010(mcr_m_serie))
    if not CRONTAB:
        mcr010_m_df.to_csv(f"{DATADIR}MCR010_LAR_M_{date_today.strftime(format='%d-%b-%Y')}_{period_lar}.csv", index=False)
    mcr010_l_df = wait_retry(get_mcr010(mcr_l_serie))
    if not CRONTAB:
        mcr010_l_df.to_csv(f"{DATADIR}MCR010_LAR_L_{date_today.strftime(format='%d-%b-%Y')}_{period_lar}.csv", index=False)
        
    ''' MCR012 '''
    mcr012_h_df = wait_retry(get_mcr012(mcr_h_serie))
    if not CRONTAB: 
        mcr012_h_df.to_csv(f"{DATADIR}MCR012_LAR_H_{date_today.strftime(format='%d-%b-%Y')}_{period_lar}.csv", index=False)
    mcr012_m_df = wait_retry(get_mcr012(mcr_m_serie))
    if not CRONTAB: 
        mcr012_m_df.to_csv(f"{DATADIR}MCR012_LAR_M_{date_today.strftime(format='%d-%b-%Y')}_{period_lar}.csv", index=False)
    mcr012_l_df = wait_retry(get_mcr012(mcr_l_serie))
    if not CRONTAB: 
        mcr012_l_df.to_csv(f"{DATADIR}MCR012_LAR_L_{date_today.strftime(format='%d-%b-%Y')}_{period_lar}.csv", index=False)
        
    return mcr010_h_df, mcr012_h_df, mcr010_m_df, mcr012_m_df, mcr010_l_df, mcr012_l_df

## Join Data

### Methods

#### DPR

In [None]:
def mcr010_for_dpr(mcr010_df):   
    mcr010_df = mcr010_df.copy() 
    mcr010_df = mcr010_df[['ForecastDate', 'ForecastPeriod', 'CUSEP', 'TransmissionLoss', 'EnergyShortfall', 'RLQ']].copy()
    mcr010_df['ForecastPeriod'] = mcr010_df['ForecastPeriod'].astype(int)
    
    return mcr010_df

In [None]:
def mcr012_for_dpr(mcr012_df):
    
    mcr012_df = mcr012_df.copy()
    
    # Flatten MCR012
    mcr012_data = {}
    for index, row in mcr012_df.iterrows():
        mcr012_data["ForecastDate"] = [row["ForecastDate"]]
        mcr012_data["ForecastPeriod"] = [row["ForecastPeriod"]]
        mcr012_data[f"{row['AncillaryService']}_ReserveRequirement"] = [row["ReserveRequirementMW"]]
        mcr012_data[f"{row['AncillaryService']}_RegulationShortfall"] = [row["RegulationShortfallMW"]]
    
    # Create a new MCR012 DataFrame
    mcr012_df = pd.DataFrame(mcr012_data)
    
    mcr012_df.rename(
        {
            'REGULATION_ReserveRequirement': 'RegulationRequirement',
            'REGULATION_RegulationShortfall': 'RegulationShortfall',
            'PRIMARY RESERVE_ReserveRequirement': 'PrimaryReserveRequirement',
            'PRIMARY RESERVE_RegulationShortfall': 'PrimaryReserveShortfall',
            'CONTINGENCY RESERVE_ReserveRequirement': 'ContingencyReserveRequirement',
            'CONTINGENCY RESERVE_RegulationShortfall': 'ContingencyReserveShortfall'
        },
        axis=1,
        inplace=True
    )
    mcr012_df['ForecastPeriod'] = mcr012_df['ForecastPeriod'].astype(int)
    
    return mcr012_df

In [None]:
def get_dpr(corp_peri_df, mcr010_df, mcr012_df):
    
    mcr_df = pd.merge(mcr010_df, mcr012_df, how='inner',
                      on=['ForecastDate', 'ForecastPeriod'])
    
    mcr_df.rename({
        'ForecastDate': 'Date',
        'ForecastPeriod': 'Period'
        }, 
        axis=1, 
        inplace=True
    )

    dpr_df = pd.merge(corp_peri_df, mcr_df, how='inner',
                        on=['Date', 'Period'])

    float64_cols = ['Demand', 'TCL', 'USEP', 'LCP', 'Regulation',
                    'PrimaryReserve', 'ContingencyReserve', 'EHEUR', 'Solar', 'CUSEP',
                    'TransmissionLoss', 'EnergyShortfall', 'RLQ', 'RegulationRequirement',
                    'RegulationShortfall', 'PrimaryReserveRequirement',
                    'PrimaryReserveShortfall', 'ContingencyReserveRequirement',
                    'ContingencyReserveShortfall']
    dpr_df[float64_cols] = dpr_df[float64_cols].astype('float64')

    dpr_df.fillna(0, inplace=True)

    dpr_df = dpr_df[['Date', 'Period',
                    'Demand', 'TCL', 'USEP', 'CUSEP',  'LCP', 'TransmissionLoss', 'EnergyShortfall',
                    'RLQ', 'Regulation', 'RegulationRequirement', 'RegulationShortfall',
                    'PrimaryReserve', 'PrimaryReserveRequirement', 'PrimaryReserveShortfall',
                    'ContingencyReserve', 'ContingencyReserveRequirement', 
                    'ContingencyReserveShortfall', 'EHEUR', 'Solar'
                    ]]
    
    print(".", end="")
    return dpr_df

#### LAR

In [None]:
def mcr010_for_lar(date_today, period_now, mcr010_df, load_scenario):
    mcr010_df = mcr010_df.copy() 
    mcr010_df = mcr010_df[['ForecastDate', 'ForecastPeriod', 'TotalLoad', 'TCL', 'USEP', 'CUSEP', 'LCP', 'TransmissionLoss', 'EnergyShortfall', 'RLQ', 'EHEUR', 'Solar']].copy()

    # mcr010_df['ForecastPeriod'] = mcr010_df['ForecastPeriod'].astype('int64')

    # Add current date and period back to dataframe.
    mcr010_df['Period'] = period_now
    mcr010_df['Date'] = date_today

    # Add load scenario info
    mcr010_df['LoadScenario'] = load_scenario

    mcr010_df.rename(
        {
            'TotalLoad': 'Demand'
        },
        axis=1,
        inplace=True
    )

    return mcr010_df

In [None]:
def mcr012_for_lar(date_today, period_now, mcr012_df):
    mcr012_df = mcr012_df.copy()
    
    # Flatten MCR012
    mcr012_data = {
        'ForecastDate': [],
        'ForecastPeriod': [],
        'RegulationRequirement': [],
        'RegulationShortfall': [],
        'PrimaryReserveRequirement': [],
        'PrimaryReserveShortfall': [],
        'ContingencyReserveRequirement': [],
        'ContingencyReserveShortfall': [],
    }
    
    count = 1
    for index, row in mcr012_df.iterrows():

        if count%3 == 0:
            mcr012_data['ForecastDate'].append(row["ForecastDate"])
            mcr012_data['ForecastPeriod'].append(row["ForecastPeriod"])
            count = 1
        else: 
            count += 1

        if row['AncillaryService'] == 'REGULATION':
            mcr012_data['RegulationRequirement'].append(row["ReserveRequirementMW"])
            mcr012_data['RegulationShortfall'].append(row["RegulationShortfallMW"])
        if row['AncillaryService'] == 'PRIMARY RESERVE':
            mcr012_data['PrimaryReserveRequirement'].append(row["ReserveRequirementMW"])
            mcr012_data['PrimaryReserveShortfall'].append(row["RegulationShortfallMW"])
        if row['AncillaryService'] == 'CONTINGENCY RESERVE':
            mcr012_data['ContingencyReserveRequirement'].append(row["ReserveRequirementMW"])
            mcr012_data['ContingencyReserveShortfall'].append(row["RegulationShortfallMW"])
    

    # Create a new MCR012 DataFrame
    mcr012_df = pd.DataFrame(mcr012_data)
    mcr012_df['Date'] = date_today
    mcr012_df['Period'] = period_now

    return mcr012_df

In [None]:
def get_lar(corp_peri_df: pd.DataFrame, mcr010_df, mcr012_df):
    
    mcr_df = pd.merge(mcr010_df, mcr012_df, how='left',
                      on=['ForecastDate', 'ForecastPeriod','Date','Period'])
    corp_peri_df.rename(
        {
            "Date": "ForecastDate",
            "Period": "ForecastPeriod"
        },
        axis=1,
        inplace=True
    )

    corp_peri_df = corp_peri_df[[
        "ForecastDate", "ForecastPeriod", 
        'Regulation', 'PrimaryReserve', 'ContingencyReserve'
        ]]

    # return corp_peri_df

    lar_df = pd.merge(corp_peri_df, mcr_df, how='left',
                      on=['ForecastDate', 'ForecastPeriod'])
    # return lar_df

    float64_cols = ['Demand', 'TCL', 'USEP', 'CUSEP', 'LCP', 'TransmissionLoss',
                    'EnergyShortfall', 'RLQ', 'EHEUR', 'RegulationRequirement', 'Regulation', 
                    'RegulationShortfall', 'PrimaryReserveRequirement', 'PrimaryReserve', 
                    'PrimaryReserveShortfall', 'ContingencyReserveRequirement', 
                    'ContingencyReserve', 'ContingencyReserveShortfall', 'Solar']

    lar_df[float64_cols] = lar_df[float64_cols].astype('float64')

    # lar_df['Date'] = lar_df['Date'].dt.date
    # lar_df['ForecastDate'] = lar_df['ForecastDate'].dt.date

    lar_df.fillna(0, inplace=True)

    lar_df = lar_df[['Date', 'Period', 'LoadScenario', 'ForecastDate', 'ForecastPeriod',
                    'Demand', 'TCL', 'USEP', 'CUSEP', 'LCP', 'TransmissionLoss',
                     'EnergyShortfall', 'RLQ', 'EHEUR', 'RegulationRequirement', 'Regulation', 
                     'RegulationShortfall', 'PrimaryReserveRequirement', 'PrimaryReserve', 
                     'PrimaryReserveShortfall', 'ContingencyReserveRequirement', 
                     'ContingencyReserve', 'ContingencyReserveShortfall', 'Solar'
                    ]]

    print(".", end="")
    return lar_df

## Save to DB

### Methods

#### DB Connection

In [None]:
# Load the environment variables from the .env file
env_file = join(ROOT, '.env')
load_dotenv(env_file)

# Get the values of host, user, pswd, db, and schema from the environment variables
DBHOST = os.getenv('host')
DBUSER = os.getenv('user')
DBPSWD = os.getenv('pswd')
DBNAME = os.getenv('db')
SCHEMA = 'public'

# Use the values as needed
engine = create_engine(f"postgresql://{DBUSER}:{DBPSWD}@{DBHOST}/{DBNAME}?options=-csearch_path%3D{SCHEMA}", echo=False)
conn = engine.connect()

#### DPR to DB

In [None]:
def db_dpr(dpr_df, schema="emcdata"):
    
    dpr_se = dpr_df.iloc[0]
    
    # check existing row
    row_exists_query = f"""
        SELECT 1 FROM {schema}."RealTimeDPR"
            WHERE "Date" = '{dpr_se['Date'].strftime(format='%Y-%m-%d')}' AND "Period" = '{dpr_se['Period']}';
        """
    row_exists = conn.execute(text(row_exists_query)).scalar()

    if row_exists:

        set_pairs = []
        d = dpr_se.to_dict()
        for k,v in d.items():
            
            if 'Date' in k:
                set_pairs.append(f""""{k}"='{v.strftime(format="%Y-%m-%d")}'""")
                continue

            set_pairs.append(f""""{k}"={float(v)}""")
        
        update_query = f'''
            UPDATE {schema}."RealTimeDPR" 
                SET {", ".join(set_pairs)}
                WHERE 
                    "Date" = '{dpr_se['Date'].strftime(format='%Y-%m-%d')}' AND 
                    "Period" = {dpr_se['Period']}
            ;
            '''
        # print(update_query)
        conn.execute(text(update_query))
        conn.commit()
        print("*", end="")
        return 0
    
    else:
        cols = []
        values = []
        d = dpr_se.to_dict()
        for k,v in d.items():
            cols.append(f'''"{k}"''')
            
            if 'Date' in k:
                values.append(f"""'{v.strftime(format="%Y-%m-%d")}'""")
                continue

            values.append(f"{float(v)}")
        
        insert_query = f'''INSERT INTO {schema}."RealTimeDPR" ({", ".join(cols)}) \n VALUEs ({", ".join(values)});'''
        # print(insert_query)
        conn.execute(text(insert_query))
        conn.commit()
        print(".", end="")
        return 1

#### LAR to DB

In [None]:
def db_lar(lar_df: pd.DataFrame, schema="emcdata"):
    new = 0
    exist = 0

    for row in range(lar_df.shape[0]):
        lar_se = lar_df.iloc[row]

        # check existing row
        row_exists_query = f"""
            SELECT 1 FROM {schema}."RealTimeLAR"
                WHERE 
                    "Date"='{lar_se['Date'].strftime(format="%Y-%m-%d")}' AND 
                    "Period"={lar_se['Period']} AND 
                    "LoadScenario"='{lar_se['LoadScenario']}' AND
                    "ForecastDate"='{lar_se['ForecastDate'].strftime(format="%Y-%m-%d")}' AND 
                    "ForecastPeriod"={lar_se['ForecastPeriod']}  
            ;
            """
        # print(row_exists_query)
        row_exists = conn.execute(text(row_exists_query)).scalar()
        # print(row_exists)
        
        # continue

        if row_exists:

            set_pairs = []
            d = lar_se.to_dict()
            for k,v in d.items():
                
                if 'Date' in k:
                    set_pairs.append(f""""{k}"='{v.strftime(format="%Y-%m-%d")}'""")
                    continue

                if 'Scenario' in k:
                    set_pairs.append(f""""{k}"='{v}'""")
                    continue

                set_pairs.append(f""""{k}"={float(v)}""")
            
            update_query = f'''
                UPDATE {schema}."RealTimeLAR" 
                    SET {", ".join(set_pairs)}
                    WHERE 
                        "Date"='{lar_se['Date'].strftime(format="%Y-%m-%d")}' AND 
                        "Period"={lar_se['Period']} AND 
                        "LoadScenario"='{lar_se['LoadScenario']}' AND
                        "ForecastDate"='{lar_se['ForecastDate'].strftime(format="%Y-%m-%d")}' AND 
                        "ForecastPeriod"={lar_se['ForecastPeriod']} 
                ;
                '''
            # print(update_query)

            conn.execute(text(update_query))
            conn.commit()
            print("*", end="")
            
            exist += 1
        else:
            cols = []
            values = []
            d = lar_se.to_dict()
            for k,v in d.items():
                cols.append(f'''"{k}"''')
                
                if 'Date' in k:
                    values.append(f"""'{v.strftime(format="%Y-%m-%d")}'""")
                    continue
                
                if 'Scenario' in k:
                    values.append(f"""'{v}'""")
                    continue

                values.append(f"{float(v)}")
            
            insert_query = f'''INSERT INTO {schema}."RealTimeLAR" ({", ".join(cols)}) \n VALUEs ({", ".join(values)});'''
            # print(insert_query)

            conn.execute(text(insert_query))
            conn.commit()
            print(".", end="")

            new += 1

    return new, exist

## Main Functions

### Methods

In [None]:
def fetch_data(
    date_today: dt.date, 
    period_now: int,
    need_tomorrow_data: bool, 
    date_tomorrow: dt.date,
    date_lar: dt.date, 
    period_lar: int
):
    
    corp_today, corp_tomorrow = fetch_corp(date_today, need_tomorrow_data, date_tomorrow)
    
    mcr010_df, mcr012_df = fetch_mcr_dpr(date_today, period_now)
    
    mcr010_h_df, mcr012_h_df, mcr010_m_df, mcr012_m_df, mcr010_l_df, mcr012_l_df = fetch_mcr_lar(date_today, date_lar, period_lar)
    

        
    return corp_today, corp_tomorrow, mcr010_df, mcr012_df, mcr010_h_df, mcr012_h_df, mcr010_m_df, mcr012_m_df, mcr010_l_df, mcr012_l_df

In [None]:
def join_data(
    date_today,
    period_now,
    date_lar,
    period_lar,
    need_tomorrow_data,
    date_tomorrow,
    
    corp_today, corp_tomorrow,
    mcr010_df, mcr010_h_df, mcr010_m_df, mcr010_l_df, 
    mcr012_df, mcr012_h_df, mcr012_m_df, mcr012_l_df
):
    ''' DPR '''
    print(" DPR", end='')
    # DPR required data
    corp_peri_df = corp_today[
        (corp_today['Date'] == date_today) & 
        (corp_today['Period'] == period_now)].copy()

    mcr010_dpr_df = mcr010_for_dpr(mcr010_df)
    mcr012_dpr_df = mcr012_for_dpr(mcr012_df)

    # Construct DPR
    dpr_df = get_dpr(corp_peri_df, mcr010_dpr_df, mcr012_dpr_df)
    
    print(" LAR", end='')
    # LAR required data
    corp_df = corp_today[
        (corp_today['Date'] == date_lar) &
        (corp_today['Period'] >= period_lar) &
        (corp_today['Period'] < period_lar + 12)
        ].copy()
    
    if need_tomorrow_data:
        # Example 1: now is period 37, then LAR data range involves
        #   - Today: period 38 - 48, 11 periods in total.
        #   - Tomorrow: period 1, 1 period in total.
        
        # Example 2, now is period 48, then LAR data range involves
        #   - Today: No data.
        #   - Tomorrow: period 1 - 12, 12 period in total.
        
        count_period_today = 48 - period_now
        period_lar_end = 12 - count_period_today
        # print(count_period_today, count_period_tomorrow, period_lar_end)
        
        corp_df_tomorrow = corp_tomorrow[
            (corp_tomorrow['Date'] == date_tomorrow) & 
            (corp_tomorrow['Period'] <= period_lar_end)].copy()

        corp_df = pd.concat([corp_df, corp_df_tomorrow])
  
    
    ''' LAR H '''
    # Construct LAR
    mcr010_lar_df = mcr010_for_lar(date_today, period_now, mcr010_h_df, 'H')
    mcr012_lar_df = mcr012_for_lar(date_today, period_now, mcr012_h_df)
    lar_h_df = get_lar(corp_df, mcr010_lar_df, mcr012_lar_df)


    ''' LAR M '''
    # Construct LAR
    mcr010_lar_df = mcr010_for_lar(date_today, period_now, mcr010_m_df, 'M')
    mcr012_lar_df = mcr012_for_lar(date_today, period_now, mcr012_m_df)
    lar_m_df = get_lar(corp_df, mcr010_lar_df, mcr012_lar_df)


    ''' LAR L '''
    # Construct LAR
    mcr010_lar_df = mcr010_for_lar(date_today, period_now, mcr010_l_df, 'L')
    mcr012_lar_df = mcr012_for_lar(date_today, period_now, mcr012_l_df)
    lar_l_df = get_lar(corp_df, mcr010_lar_df, mcr012_lar_df)
    
    return dpr_df, lar_h_df, lar_m_df, lar_l_df

In [None]:
def save_to_db(
    dpr_df, 
    lar_h_df, 
    lar_m_df, 
    lar_l_df
):
    exist = 0
    new = 0

    ''' DPR '''
    print(" DPR", end='')
    add_new = db_dpr(dpr_df, SCHEMA)

    if add_new: 
        new += 1
    else: exist += 1

    conn.commit()
    
    print(" LAR", end='')

    for lar in [lar_h_df, lar_m_df, lar_l_df]:
        add_new, add_exist = db_lar(lar, SCHEMA)

        new +=add_new
        exist += add_exist
        
    conn.commit()
    # conn.close()
    
    print(f" Updated {exist:2d} rows. Added {new:2d} rows.", end='')


In [None]:

def run(date:dt.datetime, period:int, tag='S'):
    run_start = t.time()
    
    # ----------

    date_today = date.date()
    
    now = dt.datetime.now(pytz.timezone('Asia/Singapore'))
    period_now = int(now.strftime("%H")) * 2 + int(now.strftime("%M")) // 30 + 1
    
    # Overwrite with provided period
    if period:
        period_now = period
    
    # If 12 periods after current one include periods of tomorrow.
    # A.k.a Starts from period 37.
    need_tomorrow_data = period_now >= 37
    if need_tomorrow_data:
        date_tomorrow = date_today + delta(days=1)
    else:
        date_tomorrow = None

    # LAR specific start date and period
    if period_now == 48:
        date_lar = date_today + dt.timedelta(days=1)
        period_lar = 1
    else:
        date_lar = date_today
        period_lar = period_now + 1
    
    print(f"[{tag}][{date_today.strftime(format='%d-%b-%Y')} {now.time().strftime(format='%H:%M')} P-{period_now:0>2d}]", end="")
    
    
    corp_today, corp_tomorrow, mcr010_df, mcr012_df, mcr010_h_df, mcr012_h_df, mcr010_m_df, mcr012_m_df, mcr010_l_df, mcr012_l_df = fetch_data(date_today, period_now, need_tomorrow_data, date_tomorrow, date_lar,  period_lar)
    
    dpr_df, lar_h_df, lar_m_df, lar_l_df = join_data(
        date_today, period_now, date_lar, period_lar, need_tomorrow_data, date_tomorrow,
        corp_today, corp_tomorrow, mcr010_df, mcr010_h_df, mcr010_m_df, mcr010_l_df, mcr012_df, mcr012_h_df, mcr012_m_df, mcr012_l_df
    )
    
    save_to_db(dpr_df, lar_h_df, lar_m_df, lar_l_df)
    
    # ----------
    
    run_end = t.time()
    print(f" [{(run_end - run_start):.2f}s]")


### Main Process

Run for current period.

In [None]:
now = dt.datetime.now(pytz.timezone('Asia/Singapore'))

In [None]:
task_count = 1
run(date=now, tag=task_count)
t.sleep(30)

Run for previous periods.

In [None]:
make_up_dates = []

date_start = dt.datetime(2024, 4, 15)
n_days = 5 # Number of days to make up. For stability concern, maximum 5 days.

make_up_dates = [date_start + delta(days=i) for i in range(n_days)]

for make_up_date in make_up_dates:
    # print(make_up_date)
    task_count += 1
    run(date=make_up_date, tag=task_count)
    t.sleep(30)
    
print("")

Check data availibility in database of previous 48 periods.

In [None]:
# Query to check row counts in database
def check_avail(date:dt.date, period:int):
    
    exist_dpr = False
    exist_lar = {"H": False, "M": False, "L": False}

    # check DPR existing row
    dpr_row_exists_query = f"""
        SELECT 1 FROM public."RealTimeDPR"
            WHERE "Date" = '{date}' AND "Period" = '{period}';
        """
    exist_dpr = conn.execute(text(dpr_row_exists_query)).scalar()
    if exist_dpr:
        exist_dpr = True

    # check LAR existing row
    loadscenarios = ['H','L','M']
    
    for loadscenario in loadscenarios:
    
        lar_row_exists_query = f"""
            SELECT COUNT(*) FROM public."RealTimeLAR"
                WHERE 
                    "Date"='{date}' AND 
                    "Period"={period} AND 
                    "LoadScenario"='{loadscenario}' 
            ;
            """
        count_lar = int(conn.execute(text(lar_row_exists_query)).scalar())
        if count_lar == 12:
            exist_lar[loadscenario] = True
    
    return exist_dpr, exist_lar

In [None]:
now = dt.datetime.now(pytz.timezone('Asia/Singapore'))
period_now = int(now.strftime("%H")) * 2 + int(now.strftime("%M")) // 30 + 1

date_yesterday =  now - dt.timedelta(days=1)

for period in range(period_now-1, period_now-49, -1):
    if (period<1):
        period += 48
        date = date_yesterday
    else:
        date = now
        
    # print(f"{date.strftime(format='%Y-%m-%d')} P-{period:0>2d} ")
    
    exist_dpr, exist_lar = check_avail(date.date(), period)
    
    if not exist_dpr:
        pass
    
    
    for loadscenario, exist in exist_lar.items():
        if not exist:
            pass
    

In [None]:
conn.close()