In [5]:
import math
import os
from datetime import datetime
# import numpy as np
import olieigra

BRONZE_DATA_POR_PATH = '/usr/datalake/bronze/igra/data-por'
SILVER_TIMESERIES_PATH = '/usr/datalake/silver/igra/timeseries'
STATION_ID = 'USM00072649'

In [6]:
# Make sure the destination path exists
os.makedirs(SILVER_TIMESERIES_PATH, exist_ok=True)

In [7]:
class OlieTimeSeries(olieigra.Callbacks):
    gph_top = 10000
    min_usable = 20

    def __init__(self, station_id: str, dst_path: str, min_effective_date: datetime):
        super().__init__()
        self.station_id = station_id
        self.dst_path = dst_path
        self.filename = ''
        self.filtered = 0
        self.rejected = 0
        self.writer = None
        self.min_effective_date = min_effective_date
        self.hout = ''


    def start_file(self, filename: str) -> bool:
        """Decide if we want to process the file. If so, reset state and start writing to a
        temporary file."""

        # An IGRA2 file should end with -data.txt
        if not filename.endswith('-data.txt'):
            # print(f'Skipping {filename}. Not sure what to do with it.')
            return False
        
        # Only process the file for the station we care about
        if not filename.startswith(self.station_id):
            # print(f'Skipping {filename}. Not the target file.')
            return False

        # Set the desired destination filename
        dst_filename = f'{self.dst_path}/{filename}'
        dst_filename = dst_filename.replace("-data.txt", "-data-timeseries.csv")

        # Skip this file if it has already been processed
        if os.path.exists(dst_filename):
            print(f'Skipping {filename}. Destination file already exists.')
            return False

        # If we got here, we are going to process the file
        print(f'Processing {filename}.')

        # Write to a temp file
        self.filename = dst_filename.replace('.csv', '.partial.csv')
        self.writer = open(self.filename, 'w', encoding='UTF-8')
        self.hout = ''

        # Reset the record counts
        self.filtered = 0
        self.rejected = 0

        # Write the header row
        self.writer.write(f"effective_date,hour,day_num,gph,pres,temp,dp,u,v\n")

        # Tell olieigra to continue processing
        return True


    def finish_file(self, headers: int, rows: int):
        """Callback for when processing is complete"""

        # Flush and close the temp file
        self.writer.close()
        self.writer = None

        # Rename it to the final filename
        dst_renamed = self.filename.replace('.partial.csv', '.csv')
        os.rename(self.filename, dst_renamed)

        # Calculate the number of records written
        loaded = headers - self.filtered - self.rejected

        print(f" Read {headers} headers, {rows} lines. Filtered {self.filtered}. " +
              f"Rejected {self.rejected}. Wrote {loaded} records.")


    def parse_header(self, header: olieigra.HeaderModel):
        """Transform the header record"""

        # Combine seperate fields into a datetime
        effective_date = datetime(header.year, header.month, header.day)

        # Filter out the observations that are too old
        if effective_date < self.min_effective_date:
            self.filtered += 1
            return False

        # We need some number that is analogous to the amount of sunlight and the season
        day_num = -math.cos(math.radians(effective_date.timetuple().tm_yday))

        # The observation may be rejected due to body data issues. Save the header values to
        # a variable for now. The parse_body will write it to the file, if appropriate.
        self.hout = f'{effective_date:%Y-%m-%d},{header.hour},{day_num:.2f}'

        # Continue the processing
        return True


    def parse_body(self, body: list[olieigra.BodyModel]):
        """Perform some analytics on the body"""

        # Remove non-pressure records and records with bad data
        filtered = self.filter_body(body)

        # If the obs failed validation checks, skip it
        if len(filtered) == 0:
            self.rejected += 1
            return

        # Spew the records
        for record in filtered:
            # Write the record
            self.writer.write(f'{self.hout},{record[0]},{record[1]},{record[2]},{record[3]},{record[4]},{record[5]}\n')


    def filter_body(self, body: list[olieigra.BodyModel]) -> list[float]:
        """Filter out bad data"""
        result = []
        usable_count = 0
        surface_nan = 1
        last_gph = -1

        # Iterate over every body record
        for item in body:
            # If we have at least one record over 10k in height, we have enough data to interpolate
            if last_gph >= 10000:
                break

            # Skip non-pressure records
            if item.type[0] == '3':
                continue

            # Skip records with bad or missing data
            if math.isnan(item.dpdp) | math.isnan(item.rh) | math.isnan(item.temp) | \
                    math.isnan(item.wdir) | math.isnan(item.wspd) | math.isnan(item.gph):
                continue

            # If we got here, the record passed validation. Add it to the results.
            result.append(self.transform_body(item))

            # Clear the flag if we find a valid surface sample
            if item.type == '21':
                surface_nan = 0

            # Update tracking variables
            last_gph = item.gph
            usable_count += 1

        # Final validation
        if usable_count >= self.min_usable and surface_nan == 0 and last_gph >= self.gph_top:
            # Reject the entire obs if we don't have 20 valid samples, there is 
            # not a valid surface sample, or if the balloon didn't make it to 10k
            # above the surface.
            return result
        else:
            # Passed validation, return the results
            return []


    def transform_body(self, item: olieigra.BodyModel) -> list[float]:
        """Transform the body"""

        gph = item.gph
        pres = item.pres / 100.0
        temp = item.temp / 10.0
        dp = (item.temp - item.dpdp) / 10.0

        # Convert wind from degrees/m^s to u,v
        wrad = math.radians(item.wdir)
        u = round(-item.wspd * math.sin(wrad) / 10.0, 1)
        v = round(-item.wspd * math.cos(wrad) / 10.0, 1)

        return [gph, pres, temp, dp, u, v]

In [8]:
callbacks = OlieTimeSeries(STATION_ID, SILVER_TIMESERIES_PATH, datetime(1990, 1, 1))
reader = olieigra.Reader(callbacks=callbacks)
crawler = olieigra.Crawler(reader=reader)

crawler.crawl(BRONZE_DATA_POR_PATH)

Processing USM00072649-data.txt.
 Read 28130 headers, 4133846 lines. Filtered 5951. Rejected 2828. Wrote 19351 records.
