In [1]:
import numpy as np
from datetime import datetime
from datetime import timedelta
import random
import sys
sys.path.append('/Users/jonathontordilla/Desktop/hombom24/archive-analysis/lcls-tools-plots/lcls_tools') # path to tools 
import common.data_analysis.archiver as arch # type: ignore

In [2]:
def get_days_between_datetime(start: str, end: str) -> int:
    # determine the amount of days between the start and end datetime
    format_string = "%Y/%m/%d %H:%M:%S"
    start_obj = datetime.strptime(start, format_string)
    end_obj = datetime.strptime(end, format_string)
    num_days = (end_obj - start_obj).days
    return num_days

In [3]:
def request_data_interval(pv_str: str, day: str, sample_size: int) -> float:
    """Get the amount of seconds between data points for a given PV on a given day.

    Makes a request for a PV and gets the average interval (in seconds) between data points for a given sample size.
    """

    # set the range of data and make a request
    start_str = f"{day} 00:00:00"
    end_str = f"{day} 23:59:59"
    start_time = datetime.strptime(start_str, "%Y/%m/%d %H:%M:%S")
    end_time = datetime.strptime(end_str, "%Y/%m/%d %H:%M:%S")
    arch_data = arch.get_values_over_time_range([pv_str], start_time, end_time)

    # get the list of timestamps and get the average interval based on the sample size
    intervals = []
    timestamps = arch_data[pv_str].timestamps
    if len(timestamps) == 0: 
        return 0.0
    for i in range(sample_size):
        random_first_index = random.randint(0, len(timestamps) - 2)
        second_index = random_first_index + 1
        interval = (timestamps[second_index] - timestamps[random_first_index]).total_seconds()
        intervals.append(interval)

    return float(np.mean(intervals))

In [4]:
def get_data_interval_dict(pv_str: str, start_time: str, end_time: str, period_in_days: int) \
        -> dict[float: list[datetime]]:
    """Gets the time interval between data points in a dataset. Finds the time interval between the first two data
    points for each period_in_days and returns a list of intervals (in seconds).

    Dictionary takes the form of: {interval: [start, end]}
    """

    # determine the amount of days between the start and end datetime
    num_days = get_days_between_datetime(start_time, end_time)

    interval_dict = {}
    # every period_in_days days, get the time interval between the first two data points
    day_index = 0
    while day_index < num_days:
        # get the datetime object for the current day
        format_string = "%Y/%m/%d %H:%M:%S"
        current_time_obj = datetime.strptime(start_time, format_string) + timedelta(days=day_index)
        current_time_string = current_time_obj.strftime(format_string)
        # if there are less than 30 days left, get the interval between the next two data points
        if day_index + period_in_days >= num_days:
            # get interval
            data_interval = request_data_interval(pv_str, current_time_obj.strftime("%Y/%m/%d"), 100)
            interval_dict[data_interval] = [current_time_obj, current_time_obj + timedelta(days=(num_days - day_index))]
            break
        # for each period, get the interval between the first two data points in that period
        if day_index % period_in_days == 0 or day_index == 1:
            # get interval
            data_interval = request_data_interval(pv_str, current_time_obj.strftime("%Y/%m/%d"), 100)
            interval_dict[data_interval] = [current_time_obj, current_time_obj + timedelta(days=period_in_days)]
            day_index += period_in_days

    return interval_dict

In [5]:
my_dict = get_data_interval_dict("BPMS:L0B:0183:FW:X_SLOW", "2024/05/02 00:00:00", "2024/06/10 00:00:00", 20)
my_dict

ValueError: empty range in randrange(0, 0)