In [1]:
import numpy as np
import matplotlib.pyplot as plt
import datetime
import glob
import pandas as pd
import os
from scipy import signal
from obspy import Stream, Trace
import gc

from nptdms import TdmsFile
from DasTools import DasPrep as dp

import pyproj

from multiprocessing import Pool
from functools import partial

from das2sac_batch_run_parallel_func import das_processing, get_das_file_time, das_st_write_sac

In [3]:
datapath =  'L:\\anyuan_mine_2\\'
sacpath = 'L:\\anyuan_mine_2_sac\\'
datafile = glob.glob(datapath+'*.h5')
datafile.sort()

fname_format = datapath + 'anyuan_GL_5m_frq_1kHz_sp_1m_UTC_%Y%m%d_%H%M%S.%f.h5'
fname_npdatetime = np.array([np.datetime64(datetime.datetime.strptime(x, fname_format),'us') for x in datafile])
fname_datetime = np.array([datetime.datetime.strptime(x, fname_format) for x in datafile])

if not os.path.exists(sacpath):  os.makedirs(sacpath)

    
metadata = dp.read_das(datafile[len(datafile)//2], metadata=True)

for key in metadata.keys():
    print(key, ':', metadata[key])
    

dt = metadata['dt']
nt = metadata['nt']
file_len = dt*nt

ch1 = 90
ch2 = 1442 
das_ch_id = np.arange(ch1, ch2)

print(dt,nt, file_len)

dt : 0.001
nt : 60000
dx : 1.02095198631
nch : 1472
GL : 5.0
headers : {'AcquisitionDescription': b'', 'AcquisitionId': b'd6fdc77a-ae5f-1b30-3b90-653f4d3243ef', 'FacilityId': array([b'TBD'],
      dtype='|S3'), 'GaugeLength': 5.0, 'GaugeLength.uom': b'm', 'MaximumFrequency': 500.0, 'MaximumFrequency.uom': b'Hz', 'MeasurementStartTime': b'2023-09-01T07:09:48.193749+00:00', 'MinimumFrequency': 0.0, 'MinimumFrequency.uom': b'Hz', 'NumberOfLoci': 1472, 'PulseRate': 16000.0, 'PulseRate.uom': b'Hz', 'PulseWidth': 25.0, 'PulseWidth.uom': b'ns', 'ServiceCompanyName': b'Silixa', 'SpatialSamplingInterval': 1.0209519863128662, 'SpatialSamplingInterval.uom': b'm', 'StartLocusIndex': 0, 'TriggeredMeasurement': False, 'schemaVersion': b'2.1', 'uuid': b'2e63b005-3ece-493b-a723-e0a41856f9f5'}
0.001 60000 60.0


In [4]:
def get_continuous_segments(fname_datetime, file_len, tol):
    fname_datetime_diff = np.diff(fname_datetime)
    file_len_timedelta = datetime.timedelta(seconds=file_len*1.0001)
    segment_diff = np.where(fname_datetime_diff > file_len_timedelta)[0]  # define continuous segments by no files seperated more than the file length (15s)
    segment_start = np.r_[0, segment_diff + 1]
    segment_end = np.r_[segment_diff, len(fname_datetime) - 1]

    segment_start_datetime = fname_datetime[segment_start] + file_len_timedelta * 1.5 # shift to later by 1.5 file length to give buffer for rolling 
    segment_end_datetime = fname_datetime[segment_end] - file_len_timedelta * 1.5  # shift to earlier by 1.5 file length to give buffer for rolling 

    continuous_segment_size = np.array([x.total_seconds() for x in (segment_end_datetime - segment_start_datetime)])
    
    segment_choose = np.where(continuous_segment_size > tol)[0] 
    return segment_start_datetime[segment_choose], segment_end_datetime[segment_choose], continuous_segment_size[segment_choose]


segment_start_datetime, segment_end_datetime, continuous_segment_size = get_continuous_segments(fname_datetime, file_len, tol=10*60) # segments lasting more than 20 min
print(continuous_segment_size)

[   78599.982   123839.982   260579.982  1101719.982]


In [4]:
([(segment_start_datetime[i], segment_end_datetime[i])
  for i in range(len(segment_start_datetime))])

[(datetime.datetime(2023, 8, 8, 5, 48, 33, 238000),
  datetime.datetime(2023, 8, 9, 3, 38, 33, 220000)),
 (datetime.datetime(2023, 8, 9, 3, 43, 7, 702000),
  datetime.datetime(2023, 8, 10, 14, 7, 7, 684000)),
 (datetime.datetime(2023, 8, 10, 14, 13, 40, 637000),
  datetime.datetime(2023, 8, 13, 14, 36, 40, 619000)),
 (datetime.datetime(2023, 9, 1, 7, 11, 18, 202000),
  datetime.datetime(2023, 9, 14, 1, 13, 18, 184000))]

In [10]:
for isegment in range(len(segment_start_datetime)):
    start_time, end_time = segment_start_datetime[isegment], segment_end_datetime[isegment]
    start_time = np.datetime64(start_time)
    end_time = np.datetime64(end_time)
    segment_size = end_time - start_time
    
    tmp = segment_size.astype('timedelta64[h]')
    print(f'Segment {isegment} : {tmp}')
    print(start_time)
    print(end_time)
    print(' ' )

Segment 0 : 21 hours
2023-08-08T05:48:33.238000
2023-08-09T03:38:33.220000
 
Segment 1 : 34 hours
2023-08-09T03:43:07.702000
2023-08-10T14:07:07.684000
 
Segment 2 : 72 hours
2023-08-10T14:13:40.637000
2023-08-13T14:36:40.619000
 
Segment 3 : 306 hours
2023-09-01T07:11:18.202000
2023-09-14T01:13:18.184000
 


In [7]:
%%time

nw = 'AY'
sta = 'DAS'
mlist = np.array([2,5])
nprocs = 20

chunk_size = np.timedelta64(7200, 's') #increment in hours
increment = np.timedelta64(120, 's') # one-time increment of data one thread holds


interval = increment * nprocs # total increment of data all threads hold
for isegment in range(len(segment_start_datetime)):
    
    start_time, end_time = segment_start_datetime[isegment], segment_end_datetime[isegment]
    start_time = np.datetime64(start_time)
    end_time = np.datetime64(end_time)
    segment_size = end_time - start_time
    chunk_num = segment_size // chunk_size + 1
    
    print('Segment id: %d'%(isegment))
    print('Segment size: %s'%(segment_size.astype('timedelta64[s]'))) 
    print('Regular chunk size: %s'%(chunk_size))
    print('Regular chunk number: %s'%(chunk_num - 1)) 
    print('Remainder chunk size: %s'%((segment_size % chunk_size).astype('timedelta64[s]')))
    
    print('nCPU number: %s'%(nprocs)) 
    print('Increment by each worker: %s'%(increment)) 
    print('Interval size: %s'%(interval)) 
    print('Interval number of regular chunks (if any): %s'%(chunk_size // interval))
    print('Interval number of the remainder chunk: %s'%(segment_size % chunk_size // interval))
    print(' ')
    
    segment_folder_path = sacpath + 'SAC-segment-' + ''.join(str(start_time.astype('datetime64[s]')).split(':')) 
    if not os.path.exists(segment_folder_path):
        os.makedirs(segment_folder_path)

    for ichunk in range(chunk_num):

        chunk_start_time = start_time + chunk_size * ichunk

        chunk_folder_path = os.path.join(segment_folder_path, 
                                         'SAC-chunk-' + ''.join(str(chunk_start_time.astype('datetime64[s]')).split(':')))

        if os.path.exists(chunk_folder_path):
            print('Chunk %d of Segment %d folder already exists: %s'%(ichunk, isegment, chunk_folder_path))
            print(' ')
            continue
            
        print('Chunk %d in Segment %d: %s'%(ichunk, isegment, chunk_folder_path))
        print('Start time of this chunk: %s'%(chunk_start_time.astype('datetime64[s]')))

        data = []
        interval_num = chunk_size // interval if ichunk < chunk_num-1 else segment_size % chunk_size // interval
        print('Total intervals in this Chunk: %d'%(interval_num))
        with Pool(processes=nprocs) as pool:
            for j in range(int(interval_num)):
                print('Working on Interval %d in Chunk %d in Segment %d'%(j, ichunk, isegment))
                increment_start_times = [ chunk_start_time + (interval * j) + (increment * i) for i in range(nprocs) ]
                res = pool.map(partial(das_processing, 
                                       interval=increment, 
                                       datafile=datafile, 
                                       datafile_time=fname_npdatetime, 
                                       ch1=ch1, ch2=ch2, mlist=mlist), 
                               increment_start_times)

                data.append(np.concatenate([res[i][0] for i in range(len(res))], axis=1))
                dt = res[0][1]
                del res

        if len(data) > 0:
            data = np.concatenate(data, axis=1)

            chunk_start_time_1 = chunk_start_time + increment
            datafile_arg_choose = np.where((fname_npdatetime>=chunk_start_time)&(fname_npdatetime<chunk_start_time_1))[0]
            chunk_start_time_from_file = fname_npdatetime[datafile_arg_choose][0]

            das_st = Stream()
            for ich in das_ch_id:

                data_ich = np.where(das_ch_id==ich)[0][0]

                tr = Trace(data=data[data_ich,:], header={'network':nw, 
                                                          'station': sta, 
                                                          'location':str(data_ich), 
                                                          'channel': str(ich),
                                                         'starttime':str(chunk_start_time_from_file), 
                                                          'delta':dt})

                das_st.append(tr)

            print('Writing to sac...')
            
            if not os.path.exists(chunk_folder_path):
                os.makedirs(chunk_folder_path)
            
            with Pool(processes=nprocs) as pool:
                pool.map(partial(das_st_write_sac, date_folder_path=chunk_folder_path, write_coordinates=False), das_st)

            del tr
            del das_st
            del data
        gc.collect()
        print(' ')

Segment id: 0
Segment size: 78599 seconds
Regular chunk size: 7200 seconds
Regular chunk number: 10
Remainder chunk size: 6599 seconds
nCPU number: 20
Increment by each worker: 120 seconds
Interval size: 2400 seconds
Interval number of regular chunks (if any): 3
Interval number of the remainder chunk: 2
 
Chunk 0 in Segment 0: M:\anyuan_mine_2_sac\SAC-segment-2023-08-08T054833\SAC-chunk-2023-08-08T054833
Start time of this chunk: 2023-08-08T05:48:33
Total intervals in this Chunk: 3
Working on Interval 0 in Chunk 0 in Segment 0
Working on Interval 1 in Chunk 0 in Segment 0
Working on Interval 2 in Chunk 0 in Segment 0
Writing to sac...
 
Chunk 1 in Segment 0: M:\anyuan_mine_2_sac\SAC-segment-2023-08-08T054833\SAC-chunk-2023-08-08T074833
Start time of this chunk: 2023-08-08T07:48:33
Total intervals in this Chunk: 3
Working on Interval 0 in Chunk 1 in Segment 0
Working on Interval 1 in Chunk 1 in Segment 0
Working on Interval 2 in Chunk 1 in Segment 0
Writing to sac...
 
Chunk 2 in Segmen

 
Wall time: 1h 4min 29s
