In [3]:
import numpy as np
import tqdm
import matplotlib.pyplot as plt
import time
from collections import namedtuple
from datetime import datetime
import pandas as pd
import os

In [7]:
def to_timestamp(date, time):
    date_time = f'{date} {time}'
    sec = datetime.strptime(date_time, f"%Y-%m-%d %H:%M:%S{'.%f' if '.' in time else ''}").timestamp()
    ns = int(sec * 10**9)
    return ns

def load_period_file(path):
    df = None
    ID = int(path.split('/')[-1][:-len('_periods.txt')])
    with open(path, 'r') as period_file:
        lines = period_file.readlines()
        
        data = []
        
        name = lines[0].split()[3]
        
        for line in lines:
            if line.startswith("#"):
                continue
            date, time, track, mjd, period = line.split()
            data.append([ID,
                         name,
                         to_timestamp(date, time),
                         int(track),
                         float(mjd),
                         int(float(period)* 10**9)]) # period from sec to ns
            
                    
        df = pd.DataFrame(data, 
                          columns=["ID", "Name","Timestamp", "Track", "MJD", "Period"])
        
        #df.index = df["Track"]
        #df.set_index("Track",inplace=True)
        
        
    return df

def load_object_file(path):
    df = None
    ID = int(path.split('/')[-1][:-len('_tracks.txt')])
        
    with open(path, 'r') as object_file:
        lines = object_file.readlines()
                
        data = []
        
        for line in tqdm.tqdm(lines, desc=f"Load object file {path}: "):
            if line.startswith("#"):
                continue
            date, time, stMag, mag, filter, penumbra, distance, phase, channel, track = line.split()
            
            data.append([ID,
                         int(track),
                         to_timestamp(date, time),
                         float(stMag),
                         float(mag),
                         filter, 
                         float(penumbra),
                         float(distance),
                         float(phase),
                         int(channel)])
            
                    
        df = pd.DataFrame(data, 
                          columns=["ID", "Track", "Timestamp", "StdMag", 
                                   "Mag", "Filter", "Penumbra", "Distance", 
                                   "Phase", "Channel"])
        
    return df


def load_light_curves(periods):
    if len(periods) == 0:
        return []
    
    ID = periods.iloc[0]["ID"]
    
    object_file_name = f"{PATH}/{ID}_tracks.txt"
    
    if not os.path.isfile(object_file_name):
        return []
    
    df_tracks = load_object_file(object_file_name)
    
    light_curves = []

    for _, track in tqdm.tqdm(periods.iterrows(), total=periods.shape[0], desc="Load light curves: "):
        track_n = track["Track"]

        measurements = df_tracks[df_tracks["Track"] == track_n]
        if len(measurements) == 0:
            continue
        
        time_period = track["Period"]
        
        light_curves.extend(create_tracks_by_period(measurements, time_period))
        
    return light_curves

Track = namedtuple("Track", "light_curve start period")

def create_tracks_by_period(measurements, time_period):
    light_curves = []
    
    measurements = measurements.sort_values(by="Timestamp")
        
    start = measurements.iloc[0]["Timestamp"]
    end = start + time_period

    curve = []
    for i, m in measurements.iterrows():
        t = m["Timestamp"]
        while t > end:
            if curve != []:
                light_curves.append(Track(np.array(curve), start, time_period))
                curve = []                        
            start = end 
            end = end + time_period
            
        curve.append([m["StdMag"], (m["Timestamp"]-start)/time_period])
        
    if curve != []:
        light_curves.append(Track(np.array(curve), start, time_period))
        
    return light_curves


def plot_curve(data, time):

    x = data[data != 0]
    y = time[data != 0]

    plt.scatter(y, x)

    plt.show()

In [5]:
def SMA(light_curve, size, window_size):
    N = len(light_curve)
    step = 1 / (size + window_size)

    data = np.zeros(size)
    window_t = step * window_size

    t = window_t
    idx1, idx2 = 0, 0

    for i in range(size):

        if idx1 == len(light_curve):
            break

        while idx2 < N and light_curve[idx2][1] <= t:
            idx2 += 1
        while idx1 < N and light_curve[idx1][1] < t - window_t:
            idx1 += 1

        window_data = light_curve[idx1:idx2, 0]
        if len(window_data):
            data[i] = np.mean(window_data)
        t += step

    return data

def CMA(light_curve, size, window_size):
    N = len(light_curve)
    step = 1 / (size + 2*window_size)

    data = np.zeros(size)
    window_t = step * window_size

    t = window_t
    idx1, idx2 = 0, 0

    for i in range(size):

        if idx1 == len(light_curve):
            break

        while idx2 < N and light_curve[idx2][1] <= t + window_t:
            idx2 += 1
        while idx1 < N and light_curve[idx1][1] < t - window_t:
            idx1 += 1

        window_data = light_curve[idx1:idx2, 0]
        if len(window_data):
            data[i] = np.mean(window_data)
        t += step

    return data


def resize_curve(light_curve, size):
    data = np.zeros(size)

    j = 0
    step = 1 / size

    for i in range(size):
        t = step * i
        s = t - 0.5 * step

        k = j
        points: List[float] = []
        while k < len(light_curve):
            observation_time = light_curve[k][1]
            if observation_time < t + 0.5 * step:
                points.append(light_curve[k][0])
            if observation_time < t:
                j += 1
            k += 1
        points = np.array(points, dtype=np.float32)
        if len(points) > 0:
            data[i] = np.mean(points)

    return data

In [8]:
PATH = "c:/Users/danok/Documents/charon_share" #"/home/daniel/Desktop/charon_share/test_data"
ARRAY_SIZE = 200

In [9]:
periods = load_period_file(f"{PATH}/6816_periods.txt")
lcs = load_light_curves(periods)

Load object file c:/Users/danok/Documents/charon_share/6816_tracks.txt: 100%|█| 145974/145974 [00:02<00:00, 53146.51it/
Load light curves: 100%|███████████████████████████████████████████████████████████████| 18/18 [00:07<00:00,  2.34it/s]


In [10]:
len(lcs)

373

In [16]:
a = np.random.rand(10, 100)
b = np.random.rand(20, 100)

In [28]:
with open("data.npy", 'ab') as f:
    np.save(f, a)

In [29]:
with open("data.npy", 'ab') as f:
    np.save(f, b)

In [30]:
from pathlib import Path
import os

In [35]:
with open("data.npy", 'rb') as f:
    fsz = os.fstat(f.fileno()).st_size
    print(fsz)
    out = np.load(f)
    while f.tell() < fsz:
        print(f.tell())
        out = np.vstack((out, np.load(f)))
    

24256
8128


In [33]:
out.shape

(30, 100)