In [1]:
!pip install spicy
import numpy as np
import pandas as pd
import spicy
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from google.colab import drive
drive.mount('/content/drive')

Collecting spicy
  Downloading spicy-0.16.0-py2.py3-none-any.whl.metadata (310 bytes)
Downloading spicy-0.16.0-py2.py3-none-any.whl (1.7 kB)
Installing collected packages: spicy
Successfully installed spicy-0.16.0
Mounted at /content/drive


## Define Math Functions.

In [2]:
global number_of_characters
number_of_characters = 2
def get_equation(n_equation):
  if n_equation == 0:
    return linearfunc
  elif n_equation == 1:
    return sinfunc
  elif n_equation == -1:
    return skewsin
  else:
    return ntrigs

def linearfunc(time, a, b): #Associate integer = 0
  return a * time + b

def sinfunc(time, mean, amplitude, frequency, h_shift): #Associate integer = 1
  return amplitude * np.sin(frequency*time + h_shift) + mean

def skewsin(time, mean, amplitude, frequency, h_shift): #Associate integer = -1
  character1 = amplitude * np.cos(frequency*time + h_shift)
  character2 = (amplitude / 5) * np.sin(2*frequency*time + h_shift)
  return  character1 - character2 + mean

# Assuming 'equation' is used to define the model for curve fitting
def trig(x, a, b, c):
    return a * np.sin(b * x + c)

def ntrigs(time, mean, *params):
    result = mean
    for i in range(0, len(params), 3):
        result += trig(time, params[i], params[i+1], params[i+2])
    return result

## Machine Learning.

In [3]:
def predict(time, flux, n_equation):
  if n_equation == 0:
    param = fit_linear(time, flux)
  else:
    param = fit_trig(time, flux, n_equation)
  return get_loss(n_equation, time, flux, param)

def fit_linear(time, power):
    time = np.array(time)
    power = np.array(power)
    regression = spicy.stats.linregress(time, power)
    slope = regression.slope
    intercept = regression.intercept
    return slope, intercept

def fit_trig(time, flux, n_equation):
    equation = get_equation(n_equation)
    number_of_characters = n_equation
    time = np.array(time)
    flux = np.array(flux)
    ff = np.fft.fftfreq(len(time), (time[1]-time[0]))   # assume uniform spacing, and apply Fast Fourier ff
    Fflux = abs(np.fft.fft(flux))
    guess_mean= np.mean(flux)
    guess_amp = np.std(flux) * 2.**0.5
    guess_freq = abs(ff[np.argmax(Fflux[1:])+1])   # excluding the zero frequency "peak", which is related to mean
    guess = [guess_mean, guess_amp, 2.*np.pi*guess_freq, 0.0]
    if n_equation > 1:
      for i in range(n_equation-1):
        guess.append(0)
        guess.append(guess_freq)
        guess.append(0)
    guess = np.array(guess)
    optimization, convolution = spicy.optimize.curve_fit(equation, xdata=time, ydata=flux, p0=guess, maxfev=50000)
    return optimization


In [4]:
def get_loss(n_equation, entries, values, parameters):
  float_parameters = []
  for parameter in parameters:
    float_parameters.append(float(parameter))
  parameters = float_parameters
  entries = np.array(entries)
  values = np.array(values)
  equation = get_equation(n_equation)
  prediction = equation(entries, *parameters)
  ptp = np.ptp(values)
  error = mean_absolute_error(values, prediction) / ptp * 100
  return error, *parameters

Also we can compare this star lightcurve with its periodogram.

## Start the predictions.
In this case we will make the calculation for both normalized and straightforward case. However to improve computation time in the future it's preferible to choose just one.

In [5]:
def start_predictions(end_number):
  information = []
  for index in range(1, end_number+1):
    try:
      dataframe = pd.read_csv(f"/content/drive/My Drive/Cepheids/fluxes/object{index}.csv")
    except FileNotFoundError:
      print("File "+str(index)+" was not found.")
      continue

    min_time = dataframe["time"].min()
    max_time = dataframe["time"].max()
    min_flux = dataframe["flux"].min()
    max_flux = dataframe["flux"].max()
    try:
      prediction_linearfunc = predict(dataframe["time"], dataframe["flux"], 0)
    except:
      prediction_linearfunc = [None, None, None]
    try:
      prediction_sinfunc = predict(dataframe["time"], dataframe["flux"], 1)
    except:
      prediction_sinfunc = [None, None, None, None, None]

    try:
      prediction_skewsin = predict(dataframe["time"], dataframe["flux"], -1)
    except:
      prediction_skewsin = [None, None, None, None, None]

    try:
      prediction_twotrigs = predict(dataframe["time"], dataframe["flux"], 2)
    except:
      prediction_twotrigs = [None, None, None, None, None, None, None, None]

    try:
      prediction_threetrigs = predict(dataframe["time"], dataframe["flux"], 3)
    except:
      prediction_threetrigs = [None, None, None, None, None, None, None, None, None, None, None]

    new_row = {"object":index,
               "min_time":min_time, "max_time":max_time, "min_flux":min_flux, "max_flux":max_flux,
               "linearfunc_error":prediction_linearfunc[0], "linearfunc_slope": prediction_linearfunc[1],
               "linearfunc_intercept": prediction_linearfunc[2],

               "sinfunc_error":prediction_sinfunc[0], "sinfunc_mean": prediction_sinfunc[1],
               "sinfunc_amplitude": prediction_sinfunc[2], "sinfunc_frequency": prediction_sinfunc[3], "sinfunc_h_shift": prediction_sinfunc[4],

               "skewsin_error":prediction_skewsin[0], "skewsin_mean": prediction_skewsin[1],
               "skewsin_amplitude": prediction_skewsin[2], "skewsin_frequency": prediction_skewsin[3], "skewsin_h_shift": prediction_skewsin[4],

               "twotrigs_error":prediction_twotrigs[0], "twotrigs_mean": prediction_twotrigs[1],
               "twotrigs_amplitude1": prediction_twotrigs[2], "twotrigs_frequency1": prediction_twotrigs[3],
               "twotrigs_hshift1": prediction_twotrigs[4], "twotrigs_amplitude2": prediction_twotrigs[5],
               "twotrigs_frequency2": prediction_twotrigs[6], "twotrigs_hshift2": prediction_twotrigs[7],

               "threetrigs_error": prediction_threetrigs[0], "threetrigs_mean": prediction_threetrigs[1],
               "threetrigs_amplitude1": prediction_threetrigs[2], "threetrigs_frequency1": prediction_threetrigs[3],
               "threetrigs_hshift1": prediction_threetrigs[4], "threetrigs_amplitude2": prediction_threetrigs[5],
               "threetrigs_frequency2": prediction_threetrigs[6], "threetrigs_hshift2": prediction_threetrigs[7],
               "threetrigs_amplitude3": prediction_threetrigs[8], "threetrigs_frequency3": prediction_threetrigs[9],
               "threetrigs_hshift3": prediction_threetrigs[10]
               }
    information.append(new_row)
  dataset = pd.DataFrame.from_dict(information)
  dataset.to_csv("/content/drive/My Drive/Cepheids/new_method_sample.csv", index = False)

start_predictions(20)


  optimization, convolution = spicy.optimize.curve_fit(equation, xdata=time, ydata=flux, p0=guess, maxfev=50000)
