In [None]:
from astroquery.nasa_exoplanet_archive import NasaExoplanetArchive
import pandas as pd

# What to search for in when searching confirmed exoplanets
where_con = (
    "koi_disposition = 'CONFIRMED' AND " # Exoplanet must be confirmed (There are 4 Dispositions. 1- Confirmed 2- Candidate 3- Not Dispositioned 4- False Positive)
    "koi_depth > 500 AND " # There must be a visible signal
    "koi_model_snr > 100 AND " # The signal must be strong
    "koi_fpflag_nt = 0 AND "
    "koi_fpflag_ss = 0 AND "
    "koi_fpflag_co = 0 AND "
    "koi_fpflag_ec = 0"
)

where_fal = (
    "koi_disposition = 'FALSE POSITIVE' AND " # Only search for false positives
    "koi_fpflag_nt = 1" # Find false positives that are not transits
)

# Search the archive and return the exoplanets
confirmed = NasaExoplanetArchive.query_criteria(
    table="cumulative",
    select="kepid, koi_period, koi_duration, koi_time0bk, koi_depth, koi_model_snr, koi_disposition",
    where=where_con
).to_pandas().dropna() # Drop record with empty parameters

# Search the archive and return the false positives
false_positives = NasaExoplanetArchive.query_criteria(
    table="cumulative",
    select="kepid, koi_period, koi_duration, koi_time0bk, koi_depth, koi_model_snr, koi_disposition",
    where = where_fal
).to_pandas().dropna()

# Drop duplicate ids, keeping only the first. Only the strongest signal from any star will be in the dataset
confirmed_unique = confirmed.sort_values("koi_disposition").drop_duplicates(subset="kepid", keep="first")
false_unique = false_positives.sort_values("koi_disposition").drop_duplicates(subset="kepid", keep="first")

# Print length for debugging purposes
# print(len(confirmed_unique))
# print(len(false_unique))

# Remove overlapping ids, to avoid collision 
overlap_kepids = set(confirmed_unique['kepid']).intersection(set(false_unique['kepid']))
confirmed_clean = confirmed_unique[~confirmed_unique['kepid'].isin(overlap_kepids)]
false_clean = false_unique[~false_unique['kepid'].isin(overlap_kepids)]

# Take a sample of 125 from the exoplanets and false positives
confirmed_sample = confirmed_clean.sample(n=125, random_state=42)
false_sample = false_clean.sample(n=125, random_state=42)

# Merge and shuffle the dataset
balanced_df = pd.concat([confirmed_sample, false_sample], ignore_index=True)
balanced_df = balanced_df.sample(frac=1, random_state=42).reset_index(drop=True)

# Save the dataset as a csv for later use
balanced_df.to_csv("midSet.csv", index=False)



In [None]:
%matplotlib inline
import lightkurve as lk
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

# Init
kepid = 757450 # The id to test
curve_length = 100 # How many points should the final curve be
df = pd.read_csv("midSet.csv") # Load the dataset
row = df.loc[df["kepid"] == kepid] # Define variables from the dataset
period = row.iloc[0]["koi_period"]
t0 = row.iloc[0]["koi_time0bk"]
dur = row.iloc[0]["koi_duration"]
fractional_duration = (dur / 24.0) / period
print(row) # Log the info

# test with KIC 757450 (Clear signal)
lc = lk.search_lightcurve("KIC " + str(kepid), mission="Kepler", cadence = "long", limit = 6).download_all().stitch() # Download the light curves and "Stitch" them together
lc.scatter() # Plot a scatter graph of the light curve
lc = lc.remove_nans()

# Flatten the light curve
lc_flat = lc.flatten()
lc_flat.scatter()

# Fold the light curve on the period
lc_fold = lc_flat.fold(period, epoch_time=t0)
lc_fold.scatter()
print(len(lc_fold))

# How much data to include on either side of the curve
buffer_factor = 2

# Create a mask to isolate the curve
window_half_width = fractional_duration * buffer_factor
phase_mask = (lc_fold.phase > -(dur/24*0.5)*buffer_factor) & (lc_fold.phase < (dur/24*0.5)*buffer_factor)

# Isolate the curve
lc_zoom = lc_fold[phase_mask]
lc_zoom.scatter()

# Interpolate the curve
time = lc_zoom.time.value
flux = lc_zoom.flux.value
t_min, t_max = time.min(), time.max()
time_norm = (time - t_min) / (t_max - t_min)
new_time_norm = np.linspace(0, 1, curve_length)
new_flux = np.interp(new_time_norm, time_norm, flux)
new_time = np.linspace(t_min, t_max, curve_length)
int_lc = lk.LightCurve(time = new_time, flux = new_flux)
int_lc.scatter()

# Normalize the curve so that flux points are between 1 and 0
min_flux = np.min(new_flux)
max_flux = np.max(new_flux)
scaled_flux = (new_flux - min_flux) / (max_flux - min_flux)

lc_norm = lk.LightCurve(flux=scaled_flux, time=new_time)
lc_norm.scatter()


In [None]:
%matplotlib inline
import lightkurve as lk
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os

# WARNING, THIS TAKES A VERY LONG TIME, HOURS TO DOWNLOAD FROM THE ARCHIVE
# Init
df = pd.read_csv("midSet.csv")
kepids = df["kepid"].unique().tolist()
curve_length = 100
buffer_factor = 2

# Download all the curves
for kepid in kepids:
    if os.path.exists(f"curvesBig/{kepid}.npz"):
        print(f"File {kepid}.npz already exists. Skipping download.")
        continue  # Skip if the file already exists
    print("Downloading: " + str(kepid))

    row = df.loc[df["kepid"] == kepid]
    period = row.iloc[0]["koi_period"]
    t0 = row.iloc[0]["koi_time0bk"]
    dur = row.iloc[0]["koi_duration"]
    fractional_duration = (dur / 24.0) / period

    # Retreive Light curve
    lc = lk.search_lightcurve("KIC " + str(kepid), mission="Kepler", cadence = "long", limit = 10, author="Kepler").download_all().stitch()
    lc = lc.remove_nans()

    # Flatten curve
    lc_flat = lc.flatten()

    # Fold curve
    lc_fold = lc_flat.fold(period, epoch_time=t0)

    # Define phase mask
    window_half_width = fractional_duration * buffer_factor
    phase_mask = (lc_fold.phase > -(dur/24*0.5)*buffer_factor) & (lc_fold.phase < (dur/24*0.5)*buffer_factor)

    # Isolate curve
    lc_zoom = lc_fold[phase_mask]

    # Interpolate curve
    time = lc_zoom.time.value
    flux = lc_zoom.flux.value
    t_min, t_max = time.min(), time.max()
    time_norm = (time - t_min) / (t_max - t_min)
    new_time_norm = np.linspace(0, 1, curve_length)
    new_flux = np.interp(new_time_norm, time_norm, flux)
    new_time = np.linspace(t_min, t_max, curve_length)

    # Normalize Curve
    min_flux = np.min(new_flux)
    max_flux = np.max(new_flux)
    scaled_flux = (new_flux - min_flux) / (max_flux - min_flux)

    np.savez(f"curvesBig/{kepid}.npz", time=new_time, flux=scaled_flux) # Save the flux and time to the specified directory, the kepid is used as an identifier in the file name
    print("Downloaded: " + str(kepid))
    


In [None]:
import os
import numpy as np
import pandas as pd
import lightkurve as lk
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score

# Load the dataset
df = pd.read_csv("midSet.csv")
label_map = {"CONFIRMED": 1, "FALSE POSITIVE": 0} # Map the labels to 1 and 0 as this is what NN accepts

# Define curve length for input shape
curve_length = 100

# Create empty lists for curves and labels
X = []
Y = []

# Add curves and labels to lists
print("Loading light curves...")
for _, row in df.iterrows():
    kepid = row["kepid"]
    label = label_map.get(row["koi_disposition"])

    data = np.load(f"curvesNorm/{kepid}.npz")
    time = data["time"]
    flux = data["flux"]
    
    X.append(flux)
    Y.append(label)
    
print(f"Loaded {len(X)} light curves.")

# Make lists into numpy arrays to be used
X = np.array(X)
Y = np.array(Y)

# Test train split, 20%
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2)
X_train, X_val, Y_train, Y_val = train_test_split(X_train, Y_train, test_size=0.2)

# The structure of the RNN
print("Building model...")
model = Sequential([
    LSTM(128, return_sequences=True, input_shape=(curve_length, 1)), # First LSTM layer, allows for return of sequences to improve accuracy
    Dropout(0.3), # Dropout layer
    LSTM(64), # Another LSTM layer
    Dense(1, activation='sigmoid') # Sigmoid output layer for binary classifer
])

# Compile the model for binary cross entropy
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Train the model for 50 epochs
print("Training model...")
history = model.fit(
    X_train,
    Y_train,
    epochs=50,
    batch_size=32,
    validation_data=(X_val, Y_val),
    verbose=1
)

# Log the training and testing accuracy
scores = model.evaluate(X_train,Y_train)
print("Training Accuracy: %.2f%%\n" % (scores[1]*100))

scores = model.evaluate(X_test,Y_test)
print("Testing Accuracy: %.2f%%\n" % (scores[1]*100))

# Have the model predict the classes of the test data
Y_pred = model.predict(X_test)

# Apply sigmoid to predictions
Y_predSig = [1 * (x[0]>=0.5) for x in Y_pred]

# Plot the accuracy over the training period
plt.plot(history.history["accuracy"], label = "Training Accuracy")
plt.plot(history.history["val_accuracy"], label = "Validation Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

# Plot the loss over the training period
plt.plot(history.history["loss"], label = "Training Loss", c = "green")
plt.plot(history.history["val_loss"], label = "Validation Loss", c = "Red")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.show()

print("Precision Score:" + str(precision_score(Y_test, Y_predSig , average="macro")))
print("Recall Score:" + str(recall_score(Y_test, Y_predSig , average="macro")))
print("F1 Score:" + str(f1_score(Y_test, Y_predSig , average="macro")))

# Create a confusion matrix from the actuall classes and the predicted classes
cm = confusion_matrix(Y_test, Y_predSig)

# Confusion Metrics headers
cm_df = pd.DataFrame(cm,
                     index = ["Exoplanet","False Positive"], 
                     columns = ["Exoplanet","False Positive"])

# Plot the Confusion Matrix
sns.heatmap(cm_df, annot=True)
plt.title('Confusion Matrix')
plt.ylabel('Actual Values')
plt.xlabel('Predicted Values')
plt.show()

# Save the trained model, allows for future use
model.save("exoV3.keras")

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import lightkurve as lk
import numpy as np
import pandas as pd

# Save images of all curves (Useful for dataset analysis)
df = pd.read_csv("midSet.csv")

for _, row in df.iterrows():
    kepid = row["kepid"]
    label = row["koi_disposition"]

    data = np.load(f"curvesNorm/{kepid}.npz")
    time = data["time"]
    flux = data["flux"]

    lc = lk.LightCurve(time=time,flux=flux)
    save = lc.scatter()

    if(label == "CONFIRMED"):
        save.figure.savefig(f"curveImg/confirmed/{kepid}.png")
    elif(label == "FALSE POSITIVE"):
        save.figure.savefig(f"curveImg/falsePositive/{kepid}.png")



    
