In [None]:
import numpy as np
import pandas as pd
from sklearn.neural_network import MLPRegressor
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from astropy.time import Time
from astropy.coordinates import solar_system_ephemeris, EarthLocation
from astropy.coordinates import get_body_barycentric, get_body, get_moon

In [None]:
#TEAM VNS HELPERS https://github.com/ArnaudFickinger/astronomical_dataset_generator

#add noise to data, data must be in [x,y,z] format or you know what you are doing
#possible type of noise are gaussian/uniform random noise
def addnoise(data,mu,sigma,typ="gaussian"):
    print("adding ",typ," noise to data......")
    print("mu=",mu," and sigma=",sigma)
    ldata=np.copy(data)
    shape=np.shape(ldata)
    ldata=ldata.flatten()
    size=len(ldata)
    if typ=="gaussian":
        ldata+=np.random.normal(mu,sigma,size=size)
    elif typ=="uniform":        
        ldata+=sigma*(np.random.rand(size)-0.5+mu)
    else:
        print("invalid type of noise!")
        print("available types of noise are gaussian or uniform.")
        exit(1)
    return ldata.reshape(shape)
#randomly delete percentage of data assigned by percent
#you may want to do this to sparsify the data due random conditions like weathers, people in charge of observing ask for a day off or so.
def sparsify(data,percent):
    n=len(data)
    ndrawn=int(n*(1-percent))
    print("sparsify ",n,"data into ",n-ndrawn," data...")
    dellist=random.sample(range(n),ndrawn)
    return np.delete(data,dellist,axis=0)

In [None]:
#DataFrame stack: https://stackoverflow.com/questions/53509623/2d-array-to-two-columns-in-dataframe

In [None]:
#helper functions to reset time so that it cycle and function to control the scale of the data
def reset_time(Time,cycle):
    New_time = Time - Time.min(axis = 0)
    mod_time = New_time % cycle
    return mod_time
    
def scale(X,max_val):
    n = len(X)
    X_std = (np.array([((X[i] - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))) for i in range(0,n)]))*max_val 
    return X_std

### Learning 1D function t ---> [x, y, z]

In [None]:
from sklearn.preprocessing import PolynomialFeatures
from datetime import datetime, timedelta
from time import mktime

In [None]:
venus = pd.read_csv("/Venus2x.csv")
## note: csv will contain data for 2 orbits. Need at least one full orbit for train data
## cross-reference data generation notebook for orbit times
T = venus.copy()


T["Time"] = pd.to_datetime(T["Time"], infer_datetime_format=True)
unix_secs = T["Time"].apply(lambda t: mktime(t.timetuple()))
T["Time"] = unix_secs

T_train = np.array(T["Time"])[:,np.newaxis]
target = np.array(T[["X", "Y", "Z"]])
poly = PolynomialFeatures(7)
poly = poly.fit_transform(T_train)
# poly.shape
w = np.linalg.inv(poly.T@poly)@poly.T@target

pred_mat = poly@w - target
fig = plt.figure()
ax = fig.gca(projection='3d')
ax.plot(pred_mat[:, 0], pred_mat[:, 1], pred_mat[:, 2])
ax.plot(T['X'], T['Y'], T['Z'])
ax.legend(['Train','Actual'])
fig.show()
# T_train

FileNotFoundError: ignored

In [None]:
from google.colab import drive 
drive.mount('/content/gdrive')

Mounted at /content/gdrive


Fourier Featurization, can someone check these functions? Below that ive done a train/test example on venus


In [None]:
import math
#standardizing everything to and element in [0,1] (may not need to do this)
def fourier_featurize(X,d):
    n = len(X)
    X = np.array(X)
    X_std = np.array([((X[i] - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))) for i in range(0,n)])
    features = np.zeros((n,d),dtype=complex)
    for j in range(0,len(X)):
        features[j,] = [np.exp(X_std[j]*1j*2*np.pi*i) for i in range(0,d)]
    return features



In [None]:
#please pass one series at a time, i have made the function for a whole matrix yet
#This function works well on decomposing ascention and declenation into frequencies and then forecasting with these frequencies
#Additionally, it will allow you to pass in a hyperparameter specifying K top frequencies to use for prediction (k is max n/2).
#This function will plot the ascention or declenation if you want as well
def fourier_forecast(X,length,top_k= None,plot = False): 
    #transform the data
    
    #if top_k isnt specified set to full set of frequencies in the data
    if top_k == None:
        top_k = int((X.shape[0]-1)/2)
    
    dft = np.fft.fft(X,axis=0)[1:]
    n = X.shape[0]
    training_mean = np.mean(X)
    pred = np.array(np.zeros((length)),dtype = complex) 
    
    
    #find top k frequencies
    ps = abs(dft[0:int(n/2)])**2/n
    sorted_fourier = np.sort(ps.flatten())[::-1]
    top_fourier = sorted_fourier[:top_k]
    
    #grab the index of the strongest frequencies
    freqs = [list(ps).index(i) for i in list(top_fourier)]
    fourier = dft[freqs]
    d = fourier.shape[0]
    
    
    
    #loop through the predictions, making sure we keep the frequencies consistent
    #we need to look through the indexes as well, need to multiply by two to take into account the symmetry of the spectrum
    for t in range(0,length):
        pred[t] = sum(np.array([2*fourier[k] * np.exp(2*np.pi/n*(freqs[k]+1)*(t)*1j) for k in range(0,d)]))
        pred[t] = pred[t]/n
        
       # percentile = length/10 
       # if t % percentile == 0:
       #     print(int(t/length*100),'%')
            
    pred = (pred).real + training_mean
    
    #plotting
    if plot == True:
        if n == length:
            plt.plot(X)
            plt.plot(pred)
        elif n > length:
            diff = n - length 
            pred_new = np.append(np.array(pred) , np.array([training_mean]*diff)) 
            plt.plot(X)
            plt.plot(pred_new)
        elif n < length:
            diff = length - n
            X_new = np.append(np.array(X),np.array([training_mean]*diff))
            plt.plot(X_new)
            plt.plot(pred)
    plt.show()
   
    return pred


#diagniostic function to plot the periodogram of a series    
def plot_spectrum(X):
    n = X.shape[0]
    
    fft_data = np.fft.fft(X,axis=0)
    ps = abs(fft_data[0:(int(n/2))])**2/n

    freqs = np.array(list(range(0,int(n/2) )))
    idx = np.argsort(freqs)
    
  
    fig, ax = plt.subplots()
    ax.stem(freqs[1:100], ps[1:100], markerfmt=' ')
    plt.show()

In [None]:
T = venus.copy()

T["Time"] = pd.to_datetime(T["Time"], infer_datetime_format=True)
unix_secs = T["Time"].apply(lambda t: mktime(t.timetuple()))
T["Time"] = unix_secs

T_train = np.array(T["Time"])[:7000,np.newaxis]
T_test = np.array(T["Time"])[7000:,np.newaxis]
#second argument is the number of frequencies so i suppose this is a hyper parameter
d=2
fourier = fourier_featurize(T_train,d)
fourier_test = fourier_featurize(T_test,d)
target = np.array(T[["X", "Y", "Z"]])[:7000,]
test = np.array(T[["X", "Y", "Z"]])[7000:,]

w = np.linalg.inv(fourier.T@fourier)@fourier.T@target

preds_train = fourier@w
preds_transformed = reverse_fourier(preds_train,w,d)
pred_mat = preds_transformed - target
preds_test = fourier_test@w
preds_transformed_test = reverse_fourier(preds_test,w,d)
pred_mat_test = preds_transformed_test - test

fig = plt.figure()
ax = fig.gca(projection='3d')
ax.plot(pred_mat[:, 0], pred_mat[:, 1], pred_mat[:, 2])
ax.plot(T['X'], T['Y'], T['Z'])
ax.legend(['Train','Actual'])
fig.show()


fig = plt.figure()
ax = fig.gca(projection='3d')
ax.plot(pred_mat_test[:, 0], pred_mat_test[:, 1], pred_mat_test[:, 2])
ax.plot(T['X'], T['Y'], T['Z'])
ax.legend(['Test','Actual'])
fig.show()

NameError: ignored

In [None]:
MSE = np.mean((preds_transformed - target)**2)
print(MSE)

MSE_test = np.mean((preds_transformed_test - test)**2)
MSE_test