In [3]:
import os
import h5py
import struct
import numpy as np
import configparser
import matplotlib.pyplot as plt

In [4]:
config = configparser.ConfigParser()
config.read('file_paths.ini')
config.sections()

['MAIN_DATA_PATH', 'FY3', 'HIMAWARI']

In [5]:
# File paths for FengYun-3E data.
FY3E_DATA_FOLDERS_LOCATION = config['FY3']['FY3E_MAIN_DATA_LOCATION']
FY3E_RAW_DATA_LOCATION = config['FY3']['FY3E_RAW_DATA_LOCATION']
FY3E_L0_DATA_LOCATION = config['FY3']['FY3E_L0_DATA_LOCATION']
FY3E_L1_DATA_LOCATION = config['FY3']['FY3E_L1_DATA_LOCATION']
FY3E_L1_GOE_DATA_LOCATION = config['FY3']['FY3E_L1_GOE_DATA_LOCATION']
FY3E_L1_IMAGE_DATA_LOCATION = config['FY3']['FY3E_L1_IMAGE_DATA_LOCATION']
FY3E_L1_GEOREF_IMAGE_DATA_LOCATION = config['FY3']['FY3E_L1_GEOREF_IMAGE_DATA_LOCATION']

In [6]:
fy3e_L0_files_paths = [FY3E_L0_DATA_LOCATION + file_name for file_name in os.listdir(FY3E_L0_DATA_LOCATION)]
fy3e_l1_geo_files_paths = [FY3E_L1_GOE_DATA_LOCATION + file_name for file_name in os.listdir(FY3E_L1_GOE_DATA_LOCATION)]
fy3e_l1_image_files_paths = [FY3E_L1_IMAGE_DATA_LOCATION + file_name for file_name in os.listdir(FY3E_L1_IMAGE_DATA_LOCATION)]

print('L0 files -', fy3e_L0_files_paths)
print()
print('GEO files -', fy3e_l1_geo_files_paths)
print()
print('Image files -', fy3e_l1_image_files_paths)

L0 files - ['E:/Satellite data/FY-3E/L0/Y3E_08534_230226070855_7860R_MERSI.DAT']

GEO files - ['E:/Satellite data/FY-3E/L1/geo/FY3E_MERSI_GRAN_L1_20230228_2215_GEOQK_V0.HDF']

Image files - ['E:/Satellite data/FY-3E/L1/image/FY3E_MERSI_GRAN_L1_20230228_2215_0250M_V0.HDF', 'E:/Satellite data/FY-3E/L1/image/FY3E_MERSI_GRAN_L1_20240301_0915_0250M_V0.HDF', 'E:/Satellite data/FY-3E/L1/image/FY3E_MERSI_GRAN_L1_20230128_0435_0250M_V0.HDF', 'E:/Satellite data/FY-3E/L1/image/FY3E_MERSI_GRAN_L1_20230128_0935_0250M_V0.HDF', 'E:/Satellite data/FY-3E/L1/image/FY3E_MERSI_GRAN_L1_20230131_0520_0250M_V0.HDF', 'E:/Satellite data/FY-3E/L1/image/FY3E_MERSI_GRAN_L1_20230131_0840_0250M_V0.HDF', 'E:/Satellite data/FY-3E/L1/image/FY3E_MERSI_GRAN_L1_20230131_0845_0250M_V0.HDF', 'E:/Satellite data/FY-3E/L1/image/FY3E_MERSI_GRAN_L1_20230131_1020_0250M_V0.HDF', 'E:/Satellite data/FY-3E/L1/image/FY3E_MERSI_GRAN_L1_20230131_1745_0250M_V0.HDF', 'E:/Satellite data/FY-3E/L1/image/FY3E_MERSI_GRAN_L1_20230131_2100_0250

In [7]:
def read_uint12(data_chunk: bytes):
    data = np.frombuffer(data_chunk, dtype=np.uint8)
    fst_uint8, mid_uint8, lst_uint8 = np.reshape(data, (data.shape[0] // 3, 3)).astype(np.uint16).T
    fst_uint12 = (fst_uint8 << 4) + (mid_uint8 >> 4)
    snd_uint12 = ((mid_uint8 % 16) << 8) + lst_uint8
    return np.reshape(np.concatenate((fst_uint12[:, None], snd_uint12[:, None]), axis=1), 2 * fst_uint12.shape[0])

In [24]:
from typing import Optional
from io import RawIOBase

In [25]:
class FY3EL0Header:
    def __init__(self) -> None:
        pass

In [26]:
class FY3EL0TransportPacket:
    def __init__(self) -> None:
        pass

In [27]:
class FY3EL0Data:
    def __init__(self, 
                 header: FY3EL0Header, 
                 extra_packets: list[FY3EL0TransportPacket], 
                 packets: list[FY3EL0TransportPacket]) -> None:
        
        self.__header = header
        self.__extra_packets = extra_packets
        self.__packets = packets

    @property
    def header(self) -> FY3EL0Header:
        return self.__header
    
    @property
    def extra_packets(self) -> list[FY3EL0TransportPacket]:
        return self.__extra_packets
    
    @property
    def packets(self) -> list[FY3EL0TransportPacket]:
        return self.__packets

In [31]:
class FY3EL0Parser:
    HEADER_SIZE = 1331906
    SYNCHROSERIES_SIZE = 10
    TRANSPORT_PACKET_SIZE = 9226

    SYNCHROSERIES = b''
    
    def __init__(self, stream: RawIOBase) -> None:
        self.stream = stream
        
    def parse_header(self) -> FY3EL0Header:
        self.stream.read(FY3EL0Parser.HEADER_SIZE)

        return FY3EL0Header()
    
    def parse_packet(self) -> Optional[FY3EL0TransportPacket]:
        transport_packet = self.stream.read(FY3EL0Parser.TRANSPORT_PACKET_SIZE)

        if transport_packet[:FY3EL0Parser.SYNCHROSERIES_SIZE] = 

    def parse(self) -> FY3EL0Data:
        header = self.parse_header()
        
        extra_packets = []
        while packet := self.parse_packet():
            extra_packets.append(packet)

        packets = []
        while packet := self.parse_packet():
            packets.append(packet)
        
        return FY3EL0Data(header, extra_packets, packets)

In [32]:
data = open(fy3e_L0_files_paths[0], 'br', buffering=0)

In [33]:
tmp = FY3EL0Parser(data)

b'x\xb1\x11\x00FY3E  '
