In [None]:
%matplotlib notebook
import tqdm
import tqdm.notebook
import h5py

import os
import shutil
import multiprocessing
import sys

import dateutil
import datetime
import time

import matplotlib
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

import plotly.express as px

import numpy as np
import random
import strawb

import pandas

# Load DB

In [None]:
db = strawb.SyncDBHandler()
db.load_onc_db_update(save_db=True, output=True)

# Mask files of interest: PMTSpec, PMT rates, downloaded, valid, not rate_scan

In [None]:
mask = db.dataframe.dataProductCode == 'PMTSD'
mask &= db.dataframe.deviceCode == 'TUMPMTSPECTROMETER001'
mask &= db.dataframe.synced
mask &= db.dataframe.file_version > 2
mask &= db.dataframe.measurement_type != 'rate_scan'

mask_time = db.dataframe.dateFrom>='2021-09-22T04:00'
mask_time &= db.dataframe.dateFrom<'2021-09-22T05:00'
db.dataframe[mask & mask_time].iloc[0]

pmt = strawb.sensors.PMTSpec(db.dataframe[mask & mask_time].fullPath.iloc[0])

In [None]:
strawb.trb_tools.InterpolatedRatesFile

In [None]:
def calculate_interp_rates(t, dataframe, dt, interp_frequency = 1./60., bar_position=0, move_to_dss=True):
    mask_i = dataframe.dateFrom >= t
    mask_i &= dataframe.dateFrom < t+dt
    
    device_code = dataframe[mask_i].deviceCode.unique()[0]
    
    file_name = f'{device_code.lower()}_{t:%Y%m%d}_{dt.freqstr}_{interp_frequency:.2f}_rates_interpolated.hdf5'
    file_path_0 = os.path.join(os.path.abspath('.'), file_name)
    
    directory = os.path.join(strawb.Config.proc_data_dir, device_code.lower()) #, f'{t:%Y%m}')
    file_path_1 = os.path.join(directory, file_name)
    
    if not os.path.exists(directory):
        os.makedirs(directory)
        
    file_attrs= {'device_code': device_code,
                            'file_end': (t+dt).timestamp() - 1e-3,
                            'file_start': t.timestamp(),
                           }
    group_attrs={'interp_frequency': interp_frequency}
    
    err_dict = {}
#     print(file_path_0)
    if os.path.exists(file_path_0):
        os.remove(file_path_0)
        
    file = strawb.trb_tools.InterpolatedRatesFile(file_path_0, read_data=False)  # don't read as file doesn't exist
    
    print(f"Start new file for: {t}")  # you need this print to see the progress bar of this process
#     for i in dataframe.fullPath[mask_i]:
    for i in tqdm.notebook.tqdm(dataframe.fullPath[mask_i], position=bar_position, desc=str(t)):
#         print(t, os.path.basename(i))
        pmt_i = strawb.sensors.PMTSpec(i)

        daq_frequency_readout = pmt_i.file_handler.daq_frequency_readout[:]

        # check that readout frequency doesn't change in a file
        unique = np.unique(daq_frequency_readout[daq_frequency_readout != -1])
        if unique.shape[0] > 1:
            print('skipp ', os.path.basename(i), unique)
            continue
        del daq_frequency_readout, unique

        # print(np.unique(daq_frequency_readout[daq_frequency_readout != -1]).shape)
        # print(daq_frequency_readout[:], daq_frequency_readout[daq_frequency_readout != -1])
        # print(np.unique(daq_frequency_readout[daq_frequency_readout != -1]).shape)

        try:
            pmt_i.trb_rates.interp_frequency = interp_frequency
        except Exception as err:
            err_dict.update({i: err.args})
        else:
            file.write_to_file(pmt_i, file_attrs=file_attrs, group_attrs=group_attrs)
        finally:
            del pmt_i 
                
    if move_to_dss:
        shutil.move(file_path_0, file_path_1)
#     print('--> DONE ', t)
    return err_dict

# Parse one file

In [None]:
err = calculate_interp_rates(t = dateutil.parser.isoparse('2021-09-22 00:00:00+00:00'),
                             dt = pandas.offsets.Hour(2),
                             dataframe = db.dataframe[mask],
                             interp_frequency = 1,
                             move_to_dss=False)

# Parse multiple files with multiple cores

In [None]:
# define the interval per file
## Candidates for the interval -> pandas.date_range(..., freq=)
# pandas.offsets.Second
# pandas.offsets.Minute
# pandas.offsets.Hour
# pandas.offsets.Day
# pandas.offsets.Week
# pandas.offsets.MonthBegin
# pandas.offsets.YearBegin
dt = pandas.offsets.MonthBegin(1)

# Manualy
t_start = dateutil.parser.isoparse('2021-09-01 00:00:00+00:00')
t_end = dateutil.parser.isoparse('2021-09-05 10:00:00+00:00')

# Take min and max from DB
# t_start = db.dataframe.dateFrom[mask].min() - dt
# t_end = db.dataframe.dateFrom[mask].max()


dr = pandas.date_range(start=t_start, #db.dataframe.dateFrom[mask].min() - dt,
                       end=t_end,
                       freq=dt,
                       normalize=True
                      )

dr

In [None]:
def f(*args, **kwargs):
    print(f'args: {args}')
    print(f'kwargs: {kwargs}')
    
f(1,2,3,4,5,7,8,9, a=1)

[[error1, 1,2,3,], [error1, 1,2,3,]]

In [None]:
# Process pool
pool = multiprocessing.Pool(4)

# Total progress bar
pbar = tqdm.notebook.tqdm(dr, desc='Compile interpolated rates', position=0)

# collect results and errors
results = []
errors = []

# called if after a process sucessfully executed the function
def update(*a):
    results.append(a)
    pbar.update()

# called if after a process failed to executed the function
def update_err(*a):
    errors.append(a)
    pbar.update()

# create the tasks and execute them
for i, dr_i in enumerate(dr):
    pool.apply_async(calculate_interp_rates,
                     args=(dr_i,),
                     kwds={'dataframe': db.dataframe[mask], 
                           'dt': dt, 
                           'interp_frequency': 1, 
                           'bar_position': i+1, 
                           'move_to_dss': False},
                     callback=update,
                     error_callback=update_err)

# wait for the process to finish
pool.close()
pool.join()
# close the progress bar
pbar.close()

# Print errors and results
print('Results:', results)
print('Errors : ', errors)

# Test multiprocessing

In [None]:
# define a dummy function
def test(i, bar_position, *args, **kwargs):        
    print('')  # needed otherwise it doesn't work
    for j in tqdm.notebook.tqdm(range(5), desc=f'Process {i}', position=bar_position):
        time.sleep(.1)
        if j == 3 and i == 2:
            raise KeyError(i, 'Test')
        
    return i

# iterable for the pool
x = np.arange(5)

# Process pool
pool = multiprocessing.Pool(4)

# Total progress bar
pbar = tqdm.notebook.tqdm(x, desc='Compile interpolated rates', position=0)

# collect results and errors
results = []
errors = []

# called if after a process sucessfully executed the function
def update(*a):
    results.append(a)
    pbar.update()

# called if after a process failed to executed the function
def update_err(*a):
    errors.append(a)
    pbar.update()

# create the tasks and execute them
for i, dr_i in enumerate(x):
    pool.apply_async(test,
                     args=(dr_i,),
                     kwds={'dataframe': db.dataframe[mask], 
                           'dt': dt, 
                           'interp_frequency': 1, 
                           'bar_position': i+1, 
                           'move_to_dss': False},
                     callback=update,
                     error_callback=update_err)

# wait for the process to finish
pool.close()
pool.join()
# close the progress bar
pbar.close()

# Print errors and results
print('Results:', results)
print('Errors : ', errors)