CHECKING HOW LONG DOES A SINGLE CONVERSION TAKES

In [1]:
from pyart.io import uf
from datetime import timedelta
from datetime import datetime
from numpy import ma
from gzip import open as gzip_open

class UFReader():
    """Reader that reads all data from a set of UF Radar Files.
    """
    def __init__(self, file_path):
        self.uf_file = file_path

    def _read_radar(self):
        """Read the radar file and return some important values.

        :return: The radar data including some important values.
        :rtype: Tuple
        """
        uf_filename = self.uf_file
        if uf_filename.endswith('.gz'):
            with gzip_open(uf_filename, 'rb') as unzipped_file:
                radar = uf.read_uf(
                    unzipped_file,
                    file_field_names=True
                )
        else:
            radar = uf.read_uf(
                uf_filename,
                file_field_names=True
            )
        sweep_start_ray_idx = radar.sweep_start_ray_index['data'][:]
        sweep_end_ray_idx = radar.sweep_end_ray_index['data'][:]

        cz = (radar.fields['CZ']['data'][:])
        dr = (radar.fields['DR']['data'][:])
        rh = (radar.fields['RH']['data'][:])
        
        fh = (radar.fields['FH']['data'][:])
        dm = (radar.fields['DM']['data'][:])


        # Lucy Wang added the following command lines on July 23, 2018
        gate_latitude = radar.gate_latitude['data'][:]
        gate_longitude = radar.gate_longitude['data'][:]
        gate_altitude = radar.gate_altitude['data'][:]
        # -------------------------------------------------------------

        # Get time in %Y-%m-%d%H:%M:%SZ format
        full_time = radar.time['units']\
            .replace('since', '')\
            .replace('seconds', '')
        full_time = full_time.replace(' ', '')
        full_time = full_time.replace('T', '')
        full_time = datetime.strptime(full_time, '%Y-%m-%d%H:%M:%SZ')

        return (
            sweep_start_ray_idx,
            sweep_end_ray_idx,
            cz,
            dr,
            rh,
            fh,
            dm,
            gate_latitude,
            gate_longitude,
            gate_altitude,
            full_time,
            radar,
        )

    def read_data(self):
        """Generator function that generates each datum from the radar file
        sequentially.
        """
        index = 0
        (
            sweep_start_ray_idx,
            sweep_end_ray_idx,
            CZ,
            DR,
            RH,
            FH,
            DM,
            gate_latitude,
            gate_longitude,
            gate_altitude,
            full_time,
            radar
        ) = self._read_radar()

        # sweep by sweep; 20 sweeps per rhi_a file (over ocean)
        print("1. outer LOOP ################", radar.nsweeps)
        for ii in range(0, radar.nsweeps):
            # Calculate the start and end time for this sweep
            idx0 = sweep_start_ray_idx[ii]
            idx1 = sweep_end_ray_idx[ii]

            # ray by ray; 226 rays per sweep
            print("2. inner loop ################", len(range(idx0, idx1 + 1)))
            for ray in range(idx0, idx1 + 1):
                tmp_cz = CZ[ray, :]
                tmp_dr = DR[ray, :]
                tmp_rh = RH[ray, :]
                tmp_fh = FH[ray, :]
                tmp_dm = DM[ray, :]
                tmp_time_ray = (full_time + timedelta(
                    seconds=float(radar.time['data'][ray])
                )).strftime('%Y-%m-%dT%H:%M:%SZ')
                tmp_gate_lat = gate_latitude[ray, :]
                tmp_gate_lon = gate_longitude[ray, :]
                tmp_gate_alt = gate_altitude[ray, :]

                # check CZ values gate by gate; 1081 gates per ray
                print("3. another inner loop ################", len(tmp_cz))
                
                for gate in range(0, len(tmp_cz)):
                    if tmp_cz[gate] is not ma.masked:
                        row_dict = {
                            'timestamp': tmp_time_ray,
                            'lat': round(float(tmp_gate_lat[gate]), 4),
                            'lon': round(float(tmp_gate_lon[gate]), 4),
                            'height': float(tmp_gate_alt[gate]),
                            'CZ': float(tmp_cz[gate]),
                            'DR': float(tmp_dr[gate]),
                            'RH': float(tmp_rh[gate]),
                            'FH': float(tmp_fh[gate]),
                            'DM': float(tmp_dm[gate]),
                        }
                        yield row_dict
                        index += 1



## You are using the Python ARM Radar Toolkit (Py-ART), an open source
## library for working with weather radar data. Py-ART is partly
## supported by the U.S. Department of Energy as part of the Atmospheric
## Radiation Measurement (ARM) Climate Research Facility, an Office of
## Science user facility.
##
## If you use this software to prepare a publication, please cite:
##
##     JJ Helmus and SM Collis, JORS 2016, doi: 10.5334/jors.119



In [2]:
import os
import json
import numpy as np
from datetime import datetime
from threading import Thread, Lock

to_rad = np.pi / 180.0
to_deg = 180.0 / np.pi

steps = [32, 16, 8, 4, 2, 1]

class PointCloud:
    def __init__(self, key, lon, lat, alt, value, time, epoch):
        self.key = key
        self.lon = lon
        self.lat = lat
        self.alt = alt
        self.time = time
        self.value = value
        self.epoch = epoch
        self.tasks = []
        self.threads = []
        for i in range(10):
            self.threads.append(Thread(target=self.worker_function))
        self.tileset_lock = Lock()
        self.tileset_json = {
        	"asset": {
        		"version": "1.0",
        		"type": "Airborne Radar"
        	},
        	"root": {
        		"geometricError": 1000000,
        		"refine" : "REPLACE",
        		"boundingVolume": {
                    "region": [
                        float(np.min(lon)) * to_rad,
                        float(np.min(lat)) * to_rad,
                        float(np.max(lon)) * to_rad,
                        float(np.max(lat)) * to_rad,
                        float(np.min(alt)) * to_rad,
                        float(np.max(alt)) * to_rad
                    ]
                },
                "children": []
        	},
            "properties": {
                "epoch": "{}Z".format(datetime.utcfromtimestamp(epoch).isoformat()),
                "refined": []
            }
        }


    def worker_function(self):
        while len(self.tasks) > 0:
                tile, start, end = self.tasks.pop()
                print(tile, start, end)
                self.generate(tile, start, end)


    def start(self):
        for t in self.threads:
            t.start()


    def join(self):
        for t in self.threads:
            t.join()
        with open('{}/tileset.json'.format(self.key), mode='w+') as outfile:
            json.dump(self.tileset_json, outfile)


    def schedule_task(self, tile, start, end):
        self.tasks.append((tile, start, end))


    def generate(self, tile, start, end):
        print(tile, start, end)
        parent_tile = self.tileset_json["root"]
        cartesian, offset, scale, cartographic, region = self.cartographic_to_cartesian(start, end)

        value = self.value[start:end]
        time = self.time[start:end]

        epoch = int(np.min(time) + self.epoch - 300)
        epoch = "{}Z".format(datetime.utcfromtimestamp(epoch).isoformat())
        end = int(np.max(time) + self.epoch + 300)
        end = "{}Z".format(datetime.utcfromtimestamp(end).isoformat())

        header_length = 28
        magic = np.string_("pnts")
        version = 1

        for step in steps:
            self.tileset_lock.acquire()
            try:
                filename = "{}_{}.pnts".format(tile, step)
                child_tile = {
                    "availability": "{}/{}".format(epoch, end),
                    "geometricError": step * 500,
                    "boundingVolume": {
                        "region": region
                    },
                    "content": {
                        "uri": filename
                    },
                    "refine": "REPLACE"
                }
                if step == 1:
                    self.tileset_json["properties"]["refined"].append(filename)
                else:
                    child_tile["children"] = []
                parent_tile["children"].append(child_tile)
                parent_tile = child_tile
            finally:
                self.tileset_lock.release()

            tile_length = 0
            feature_table_binary_byte_length = 0
            batch_table_binary_byte_length = 0
            length = value[::step].size

            feature_table_json = {
                "POINTS_LENGTH": length,
                "BATCH_LENGTH": length,
                "BATCH_ID": {
                    "byteOffset": 0,
                    "componentType": "UNSIGNED_INT"
                },
                "POSITION_QUANTIZED": {
                    "byteOffset": length * 4
                },
                "QUANTIZED_VOLUME_OFFSET": offset,
                "QUANTIZED_VOLUME_SCALE": scale
            }

            batch_table_json = {
                "value": {
                    "byteOffset": 0,
                    "componentType": "FLOAT",
                    "type": "SCALAR"
                },
                "time": {
                    "byteOffset": length * 4,
                    "componentType": "FLOAT",
                    "type": "SCALAR"
                },
                "location": {
                    "byteOffset": length * 8,
                    "componentType": "SHORT",
                    "type": "VEC3"
                }
            }

            tile_length += header_length

            feature_table_json_min = json.dumps(feature_table_json, separators=(",", ":")) + "       "
            feature_table_trim = (tile_length + len(feature_table_json_min)) % 8
            if feature_table_trim != 0:
                feature_table_json_min = feature_table_json_min[:-feature_table_trim]

            tile_length += len(feature_table_json_min)

            feature_table_binary_byte_length = length * 4 + length * 3 * 2
            tile_length += feature_table_binary_byte_length
            feature_table_padding = tile_length % 8
            if feature_table_padding != 0:
                feature_table_padding = 8 - feature_table_padding
            tile_length += feature_table_padding

            batch_table_json_min = json.dumps(batch_table_json, separators=(",", ":")) + "       "
            batch_table_trim = (tile_length + len(batch_table_json_min)) % 8
            if batch_table_trim != 0:
                batch_table_json_min = batch_table_json_min[:-batch_table_trim]

            tile_length += len(batch_table_json_min)

            batch_table_binary_byte_length = length * 4 * 2 + length * 2 * 3
            tile_length += batch_table_binary_byte_length
            batch_table_padding = tile_length % 8
            if batch_table_padding != 0:
                batch_table_padding = 8 - batch_table_padding
            tile_length += batch_table_padding

            with open('{}/{}'.format(self.key, filename), mode='wb+') as outfile:
                outfile.write(np.string_(magic).tobytes())
                outfile.write(np.uint32(version).tobytes())
                outfile.write(np.uint32(tile_length).tobytes())
                outfile.write(np.uint32(len(feature_table_json_min)).tobytes())
                outfile.write(np.uint32(feature_table_binary_byte_length + feature_table_padding).tobytes())
                outfile.write(np.uint32(len(batch_table_json_min)).tobytes())
                outfile.write(np.uint32(batch_table_binary_byte_length + batch_table_padding).tobytes())
                outfile.write(np.string_(feature_table_json_min).tobytes())
                outfile.write(np.arange(length, dtype=np.uint32).tobytes())
                outfile.write(cartesian[::step, :].tobytes())
                for _ in range(feature_table_padding):
                    outfile.write(np.string_(" ").tobytes())
                outfile.write(np.string_(batch_table_json_min).tobytes())
                outfile.write(value[::step].astype(np.float32).tobytes())
                outfile.write(time[::step].astype(np.float32).tobytes())
                outfile.write(cartographic[::step, :].tobytes())
                for _ in range(batch_table_padding):
                    outfile.write(np.string_(" ").tobytes())
                outfile.seek(0)


    def cartographic_to_cartesian(self, start, end):
        lon = self.lon[start:end]
        lat = self.lat[start:end]
        alt = self.alt[start:end]
        size = lon.size

        cartographic = np.zeros(shape=(size, 3), dtype=np.int16)
        cartographic[:, 0] = (lon * 32767 / 180).astype(np.int16)
        cartographic[:, 1] = (lat * 32767 / 180).astype(np.int16)
        cartographic[:, 2] = (alt / 10).astype(np.int16)

        lon = lon * to_rad
        lat = lat * to_rad

        radiiSquared = np.array([40680631590769, 40680631590769, 40408299984661.445], dtype=np.float64)

        N1 = np.multiply(np.cos(lat), np.cos(lon))
        N2 = np.multiply(np.cos(lat), np.sin(lon))
        N3 = np.sin(lat)

        magnitude = np.sqrt(np.square(N1) + np.square(N2) + np.square(N3))

        N1 = N1 / magnitude
        N2 = N2 / magnitude
        N3 = N3 / magnitude

        K1 = radiiSquared[0] * N1
        K2 = radiiSquared[1] * N2
        K3 = radiiSquared[2] * N3

        gamma = np.sqrt(np.multiply(N1, K1) + np.multiply(N2, K2) + np.multiply(N3, K3))

        K1 = K1 / gamma
        K2 = K2 / gamma
        K3 = K3 / gamma

        N1 = np.multiply(N1, alt)
        N2 = np.multiply(N2, alt)
        N3 = np.multiply(N3, alt)

        # x = np.multiply((N1 + K1), np.random.normal(1, .00005, N1.size))
        # y = np.multiply((N2 + K2), np.random.normal(1, .00005, N1.size))
        # z = np.multiply((N3 + K3), np.random.normal(1, .00005, N1.size))

        x = N1 + K1
        y = N2 + K2
        z = N3 + K3

        offset = [float(np.min(x)), float(np.min(y)), float(np.min(z))]

        x = x - offset[0]
        y = y - offset[1]
        z = z - offset[2]

        scale = [float(abs(np.max(x))), float(abs(np.max(y))), float(abs(np.max(z)))]

        cartesian = np.zeros(shape=(size, 3), dtype=np.uint16)
        cartesian[:, 0] = (x / scale[0] * 65535.0).astype(np.uint16)
        cartesian[:, 1] = (y / scale[1] * 65535.0).astype(np.uint16)
        cartesian[:, 2] = (z / scale[2] * 65535.0).astype(np.uint16)

        region = [
            float(np.min(lon)),
            float(np.min(lat)),
            float(np.max(lon)),
            float(np.max(lat)),
            float(np.min(alt)),
            float(np.max(alt))
        ]

        return cartesian, offset, scale, cartographic, region


In [3]:
import sys
import os
import zarr
import numpy as np
import json
import datetime as dt

to_rad = np.pi / 180.0
to_deg = 180.0 / np.pi

def generate_point_cloud(variable, epoch, end, zarr_location, point_cloud_folder):
    """Generates json pointcloud from a given zarr file input

    Args:
        variable (_type_): _description_
        epoch (_type_): _description_
        end (_type_): _description_
        zarr_location (string): source zarr file.
        point_cloud_folder (string): destination folder for 3d tile json file.
    """

    #out_key = f"{os.getenv('CRS_OUTPUT_FLIGHT_PATH')}/{shortname}"
    #pc_out_key = f"{output_path}/point_cloud"

    '''
    try:
        os.mkdir(out_key)
    except:
        pass
    '''

    try:
        os.mkdir(point_cloud_folder)
    except:
        pass

    # LOAD THE DATA.
    store = zarr.DirectoryStore(zarr_location)
    root = zarr.group(store=store)

    chunk_id = root["chunk_id"][:]
    num_chunks = chunk_id.shape[0]
    id = np.argmax(chunk_id[:, 1] > epoch) - 1
    start_id = chunk_id[0 if id < 0 else id, 0]
    id = num_chunks - np.argmax(chunk_id[::-1, 1] < end)
    end_id = chunk_id[id, 0] if id < num_chunks else root["time"].size - 1

    root_epoch = root.attrs["epoch"]
    location = root["location"][start_id:end_id]
    lon = location[:, 0]
    lat = location[:, 1]
    alt = location[:, 2]
    value = root["value"][variable][start_id:end_id]
    time = root["time"][start_id:end_id]

    # filter data using mask
    epoch = epoch - root_epoch # date-time
    end = end - root_epoch
    mask = np.logical_and(time >= epoch, time <= end)
    lon = lon[mask]
    lat = lat[mask]
    alt = alt[mask]
    value = value[mask]
    time = time[mask]

    # Generate Pointcloud Tileset
    point_cloud = PointCloud(point_cloud_folder, lon, lat, alt, value, time, root_epoch)

    print("start --->", dt.datetime.now())
    for tile in range(int(np.ceil(time.size / 530000))):
        start_id = tile * 530000
        end_id = np.min([start_id + 530000, time.size])
        point_cloud.schedule_task(tile, start_id, end_id)

    point_cloud.start()
    point_cloud.join()

    print("end --->", dt.datetime.now())

tileset_json = {
	"asset": {
		"version": "1.0",
		"type": "Airborne Radar"
	},
	"root": {
		"geometricError": 1000000,
		"refine" : "REPLACE",
		"boundingVolume": {
            "region": []
        },
        "children": []
	},
    "properties": {
        "epoch": "",
        "refined": []
    }
}

#if __name__ == '__main__':
#    main(sys.argv[1], sys.argv[2], int(sys.argv[3]), int(sys.argv[4]))


In [4]:
import os
import zarr
import numpy as np
import xarray as xr
import shutil
import boto3
from pathlib import Path
import s3fs
import h5py
import pandas as pd
from boto3 import client as boto_client
import tarfile
import glob

# META needed for ingest
campaign = 'Olympex'
collection = "AirborneRadar"
dataset = "gpmValidationOlympexcrs"
variables = ["ref"]
renderers = ["point_cloud"]
chunk = 262144
to_rad = np.pi / 180
to_deg = 180 / np.pi

def ingest(folder, filePath):
    """
    Converts Level 1B crs data from s3 to zarr file and then stores it in the provided folder
    Args:
        folder (string): name to hold the raw files.
        file (string): the s3 url to the raw file. WHAT FORMAT IS IT IN in hdf5 format
    """
    store = zarr.DirectoryStore(folder)
    root = zarr.group(store=store)
    
    # Create empty rows for modified data    
    z_chunk_id = root.create_dataset('chunk_id', shape=(0, 2), chunks=None, dtype=np.int64)
    z_location = root.create_dataset('location', shape=(0, 3), chunks=(chunk, None), dtype=np.float32)
    z_time = root.create_dataset('time', shape=(0), chunks=(chunk), dtype=np.int32)
    z_vars = root.create_group('value')
    z_ref = z_vars.create_dataset('atb', shape=(0), chunks=(chunk), dtype=np.float32)
    n_time = np.array([], dtype=np.int64)

    # date = file.split("_")[5].split(".")[0]
    # base_time = np.datetime64('{}-{}-{}'.format(date[:4], date[4:6], date[6:]))

    print("Accessing file to convert to zarr ")
    
    # fs = s3fs.S3FileSystem(anon=False)
    # with fs.open(file) as cplfile:
    #     with h5py.File(cplfile, 'r') as f1:
    #         atb = f1['ATB_1064'][()]            
    #         lon  = f1['Longitude'][()]
    #         lat  = f1['Latitude'][()]
    #         alt  = f1['Plane_Alt'][()] * 1000   #[km] ==> [m]

    #         # !!! if over 24 hour fix not applied
    #         delta = [(base_time + (h*3600+m*60+s).astype('timedelta64[s]')) for (h,m,s) in 
    #                 zip(f1['Hour'][()], f1['Minute'][()], f1['Second'][()])] #delta is in seconds

    # !!! input uf file path is inside UFREADER
    ufr = UFReader(filePath)
    uf_datas = ufr.read_data() # it will return a generator.

    # using the generator, populate all the lon, lat, alt and atb values

    atb = np.array([], dtype=np.int64)            
    lon = np.array([], dtype=np.int64)
    lat = np.array([], dtype=np.int64)
    alt = np.array([], dtype=np.int64)
    time = np.array([], dtype=np.int64)

    for uf_data in uf_datas:
        atb = np.append(atb, uf_data['CZ'])
        lon = np.append(lon, uf_data['lon'])
        lat = np.append(lat, uf_data['lat'])
        alt = np.append(alt, uf_data['height'])
        time = np.append(time, np.datetime64(uf_data['timestamp']).astype('timedelta64[s]').astype(np.int64))
    
    # using the values, create a zarr file and return it.


    # # !!! if over 24 hour fix not applied
    # delta = [(base_time + (h*3600+m*60+s).astype('timedelta64[s]')) for (h,m,s) in 
    #         zip(f1['Hour'][()], f1['Minute'][()], f1['Second'][()])] #delta is in seconds


    # # num_col = atb.shape[0] # number of rows, say 7903
    # num_cols = atb.shape[1] # number of cols, say 757

    # # maintain data shape
    # # delta = np.repeat(delta, num_cols)
    # # lon = np.repeat(lon, num_cols)
    # # lat = np.repeat(lat, num_cols)
    # # alt = np.repeat(alt, num_cols)

    
    # # atb = atb.flatten()
    
    # time correction.
    # time = (delta - np.datetime64('1970-01-01')).astype('timedelta64[s]').astype(np.int64)

    # !!! no lon alt lat correction for now.
    
    # sort data by time
    

    sort_idx = np.argsort(time)

    lon = lon[sort_idx]
    lat = lat[sort_idx]
    alt = alt[sort_idx]
    atb = atb[sort_idx]
    time = time[sort_idx]

    # # remove infinite atb value and negative altitude values using mask
    # mask = np.logical_and(np.isfinite(atb), alt > 0) # alt value when not avail is defaulted to -999.0
    # lon = lon[mask]
    # lat = lat[mask]
    # alt = alt[mask]
    # atb = atb[mask]
    # time = time[mask]

    # Now populate (append) the empty rows with modified data.
    z_location.append(np.stack([lon, lat, alt], axis=-1))
    z_ref.append(atb)

    n_time = np.append(n_time, time)

    idx = np.arange(0, n_time.size, chunk)
    chunks = np.zeros(shape=(idx.size, 2), dtype=np.int64)
    chunks[:, 0] = idx
    chunks[:, 1] = n_time[idx]
    z_chunk_id.append(chunks)

    epoch = np.min(n_time)
    n_time = (n_time - epoch).astype(np.int32)
    z_time.append(n_time)

    # save it.
    root.attrs.put({
        "campaign": campaign,
        "collection": collection,
        "dataset": dataset,
        "variables": variables,
        "renderers": renderers,
        "epoch": int(epoch)
    })

def downloadFromS3(bucket_name, s3_key, dest_dir):
    s3 = boto_client('s3')
    filename = s3_key.split('/')[3]
    dest_dir = '/tmp/npol_olympex/raw/'
    dest = dest_dir + filename
    if os.path.exists(dest_dir): shutil.rmtree(f"{dest_dir}")
    Path(dest_dir).mkdir(parents=True, exist_ok=True)
    print("Downloading file",s3_key,"from bucket",bucket_name, " into dir:", dest_dir)
    s3.download_file(
        Bucket = bucket_name,
        Key = s3_key,
        Filename = dest
    )
    return dest


def untarr(raw_file_dir, raw_file_path, filename):
    unzipped_file_path = raw_file_dir + filename.split(".")[0] # removing the .tar.gz # this is important
    if raw_file_path.endswith("tar.gz"):
        with tarfile.open(raw_file_path, "r:gz") as t:
            t.extractall(unzipped_file_path)
    elif raw_file_path.endswith("tar"):
        with tarfile.open(raw_file_path, "r:") as t:
            t.extractall(unzipped_file_path)
    return unzipped_file_path
# ------------------START--------------------------------

def data_pre_process(bucket_name, field_campaign, input_data_dir, output_data_dir, instrument_name):
    # for s3_raw_file_key in keys:
    # download each input file.
    # unzip it
    # go inside rhi_a dir,
    # list all the files.
    # for each file, run ingest.
    # generate point clouds.
    # upload all of the pointcloud files.

    s3_resource = boto3.resource('s3')
    s3bucket = s3_resource.Bucket(bucket_name)    
    keys = []
    for obj in s3bucket.objects.filter(
            Prefix=f"{field_campaign}/{input_data_dir}/{instrument_name}/olympex_npol"):
        keys.append(obj.key)

    raw_file_dir = '/tmp/npol_olympex/raw/' # local dir where raw file resides.

    for s3_key in keys:
        filename = s3_key.split('/')[3]
        # raw_file_path = downloadFromS3(bucket_name, s3_key, raw_file_dir) # inc file name
        # the raw file is for a single day. When unzipped, it will contain several data collected every 20 mins
        # unzipped_file_path = untarr(raw_file_dir, raw_file_path, filename)        
        unzipped_file_path = '/tmp/npol_olympex/raw/olympex_npol_2015-1203'
        # minutely_datas = [glob.glob(f"{unzipped_file_path}/*/rhi_a/*.uf.gz")[3]]
        minutely_datas = ["/tmp/npol_olympex/raw/olympex_npol_2015-1203/1203/rhi_a/olympex_NPOL1_20151203_232011_rhi_20-40.uf.gz"]
        for index, minute_data_path in enumerate(minutely_datas):
            print(f"\n{index}. converting for {minute_data_path}")
            # convert and save.
            # # SOURCE DIR.
            sdate = minute_data_path.split("/")[-1].split("_")[2]
            # CREATE A LOCAL DIR TO HOLD RAW DATA AND CONVERTED DATA
            folder = f"/tmp/npol_olympex/zarr/{sdate}-yolo/freq-{index}" # intermediate folder for zarr file (date + time), time rep by index.
            point_cloud_folder = f"{folder}/point_cloud" # intermediate folder for 3d tiles, point cloud
            if os.path.exists(folder): shutil.rmtree(f"{folder}")
            # os.mkdir(folder)
            Path(folder).mkdir(parents=True, exist_ok=True)
            # LOAD FROM SOURCE WITH NECESSARY PRE PROCESSING. CONVERT LEVEL 1B RAW FILES INTO ZARR FILE.
            # src_s3_path = f"s3://{bucket_name}/{s3_raw_file_key}"
            src_s3_path = "abc"
            ingest(folder, minute_data_path)
            # # CONVERT ZARR FILE INTO 3D TILESET JSON.
            generate_point_cloud("atb",  0,  1000000000000, folder, point_cloud_folder)
            # # UPLOAD CONVERTED FILES.
            files = os.listdir(point_cloud_folder)

def npol():
    # bucket_name = os.getenv('RAW_DATA_BUCKET')
    bucket_name="ghrc-fcx-field-campaigns-szg"
    field_campaign = "Olympex"
    input_data_dir = "instrument-raw-data"
    output_data_dir = "instrument-processed-data"
    instrument_name = "npol"
    data_pre_process(bucket_name, field_campaign, input_data_dir, output_data_dir, instrument_name)


npol()



0. converting for /tmp/npol_olympex/raw/olympex_npol_2015-1203/1203/rhi_a/olympex_NPOL1_20151203_232011_rhi_20-40.uf.gz
Accessing file to convert to zarr 
1. outer LOOP ################ 20
2. inner loop ################ 226
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081


  time = np.append(time, np.datetime64(uf_data['timestamp']).astype('timedelta64[s]').astype(np.int64))


3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ################ 1081
3. another inner loop ##########

### ROUGH

Generators

In [None]:
def fun(x):
    n = 0
    while n < x:
        yield n
        n += 1
z = fun(10)
next(z)
next(z)

In [None]:
next(z)