In [1]:
#Importing libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler
from sklearn.model_selection import train_test_split
from pyspark.ml import Pipeline
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM
import numpy as np
import pandas as pd
from tensorflow.keras.utils import to_categorical
import os




In [2]:
os.environ['JAVA_HOME'] = "C:\Program Files\Java\jdk-11"

In [3]:
#SparkSession
spark = SparkSession.builder \
    .appName("CodeExample") \
    .getOrCreate()

In [4]:
nasa = spark.read.csv("nasa.csv", header=True, inferSchema=True)

In [5]:
nasa.show(10)

+-----+-----+---+-------------------+-------+--------+-----+------------+--------+---------+------+------+---------+----------+-------+-------+----------+----------------+-------------+------+-----------+-----------+----+---------+---------+--------+--------+----------+----------+---------+--------+----------+---------+-------+---------+--------+---+--------+---------+--------+---------+---------+--------+---------+---------+----+----+-----+---+---+---+----+----+----+
| year|month|day|              td_ge|     dt|luna_num|saros|eclipse_type|   gamma|magnitude|lat_ge|lng_ge|lat_dd_ge| lng_dd_ge|sun_alt|sun_azm|path_width|central_duration|duration_secs|cat_no|canon_plate|julian_date|  t0|       x0|       x1|      x2|      x3|        y0|        y1|       y2|      y3|        d0|       d1|     d2|      mu0|     mu1|mu2|     l10|      l11|     l12|      l20|      l21|     l22|   tan_f1|   tan_f2|tmin|tmax|etype|PNS|UNS|NCN|nSer|nSeq|nJLE|
+-----+-----+---+-------------------+-------+--------+

In [6]:
nasa.tail(10)

[Row(year=2996, month=7, day=6, td_ge=datetime.datetime(2024, 4, 13, 23, 44, 3), dt=4395.4, luna_num=12325, saros=162, eclipse_type='A', gamma=0.5013, magnitude=0.9508, lat_ge='51.6N', lng_ge='145.6W', lat_dd_ge=51.58156, lng_dd_ge=-145.64313, sun_alt=59.7, sun_azm=198.9, path_width=208.3, central_duration='04m44s', duration_secs=284.4, cat_no=11889.0, canon_plate=595.0, julian_date=2815514.489, t0=0.0, x0=0.242102, x1=0.4963752, x2=-4.63e-05, x3=-5.63e-06, y0=0.459302, y1=-0.1119435, y2=-0.0001537, y3=1.4e-06, d0=22.294821, d1=-0.004573, d2=-5e-06, mu0=178.18694, mu1=14.99965, mu2=0.0, l10=0.564323, l11=4.12e-05, l12=-9.8e-06, l20=0.018095, l21=4.1e-05, l22=-9.7e-06, tan_f1=0.0046035, tan_f2=0.0045806, tmin=-3.0, tmax=3.0, etype=2, PNS=1, UNS=0, NCN=0, nSer=70, nSeq=42, nJLE=2),
 Row(year=2996, month=12, day=31, td_ge=datetime.datetime(2024, 4, 13, 12, 58, 17), dt=4399.0, luna_num=12331, saros=167, eclipse_type='T', gamma=-0.1729, magnitude=1.02488, lat_ge='32.9S', lng_ge='6.2E', lat_

In [7]:
num_rows = nasa.count()
num_columns = len(nasa.columns)
print("Shape of the DataFrame: ({}, {})".format(num_rows, num_columns))

Shape of the DataFrame: (11898, 54)


In [8]:
nasa.dtypes 

[('year', 'int'),
 ('month', 'int'),
 ('day', 'int'),
 ('td_ge', 'timestamp'),
 ('dt', 'double'),
 ('luna_num', 'int'),
 ('saros', 'int'),
 ('eclipse_type', 'string'),
 ('gamma', 'double'),
 ('magnitude', 'double'),
 ('lat_ge', 'string'),
 ('lng_ge', 'string'),
 ('lat_dd_ge', 'double'),
 ('lng_dd_ge', 'double'),
 ('sun_alt', 'double'),
 ('sun_azm', 'double'),
 ('path_width', 'double'),
 ('central_duration', 'string'),
 ('duration_secs', 'double'),
 ('cat_no', 'double'),
 ('canon_plate', 'double'),
 ('julian_date', 'double'),
 ('t0', 'double'),
 ('x0', 'double'),
 ('x1', 'double'),
 ('x2', 'double'),
 ('x3', 'double'),
 ('y0', 'double'),
 ('y1', 'double'),
 ('y2', 'double'),
 ('y3', 'double'),
 ('d0', 'double'),
 ('d1', 'double'),
 ('d2', 'double'),
 ('mu0', 'double'),
 ('mu1', 'double'),
 ('mu2', 'double'),
 ('l10', 'double'),
 ('l11', 'double'),
 ('l12', 'double'),
 ('l20', 'double'),
 ('l21', 'double'),
 ('l22', 'double'),
 ('tan_f1', 'double'),
 ('tan_f2', 'double'),
 ('tmin', 'doub

In [9]:
from pyspark.sql.functions import col, count
missing_values = nasa.agg(*[count(col(c)).alias(c) for c in nasa.columns])
missing_values.show()

+-----+-----+-----+-----+-----+--------+-----+------------+-----+---------+------+------+---------+---------+-------+-------+----------+----------------+-------------+------+-----------+-----------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+
| year|month|  day|td_ge|   dt|luna_num|saros|eclipse_type|gamma|magnitude|lat_ge|lng_ge|lat_dd_ge|lng_dd_ge|sun_alt|sun_azm|path_width|central_duration|duration_secs|cat_no|canon_plate|julian_date|   t0|   x0|   x1|   x2|   x3|   y0|   y1|   y2|   y3|   d0|   d1|   d2|  mu0|  mu1|  mu2|  l10|  l11|  l12|  l20|  l21|  l22|tan_f1|tan_f2| tmin| tmax|etype|  PNS|  UNS|  NCN| nSer| nSeq| nJLE|
+-----+-----+-----+-----+-----+--------+-----+------------+-----+---------+------+------+---------+---------+-------+-------+----------+----------------+-------------+------+-----------+-----------+-----+-----+----

In [10]:
from pyspark.sql.functions import col, stddev_pop, avg
from functools import reduce

numerical_cols = [col_name for col_name, data_type in nasa.dtypes if data_type != 'string' and data_type != 'timestamp']

summary_stats = nasa.select([stddev_pop(col(col_name)).alias(col_name + '_stddev') for col_name in numerical_cols] +
                            [avg(col(col_name)).alias(col_name + '_mean') for col_name in numerical_cols])

nasa_with_stats = nasa.crossJoin(summary_stats)

threshold = 3

filter_conditions = []
for col_name in numerical_cols:
    lower_bound = col(col_name + '_mean') - threshold * col(col_name + '_stddev')
    upper_bound = col(col_name + '_mean') + threshold * col(col_name + '_stddev')
    condition = (col(col_name) >= lower_bound) & (col(col_name) <= upper_bound)
    filter_conditions.append(condition)

filtered_nasa = nasa_with_stats.filter(reduce(lambda a, b: a & b, filter_conditions))

filtered_nasa = filtered_nasa.drop(*[col_name + '_stddev' for col_name in numerical_cols] +
                                    [col_name + '_mean' for col_name in numerical_cols])

filtered_nasa.show()

+-----+-----+---+-------------------+-------+--------+-----+------------+--------+---------+------+------+---------+----------+-------+-------+----------+----------------+-------------+------+-----------+-----------+----+---------+---------+--------+--------+----------+----------+---------+--------+----------+---------+-------+---------+--------+---+--------+---------+--------+---------+---------+--------+---------+---------+----+----+-----+---+---+---+----+----+----+
| year|month|day|              td_ge|     dt|luna_num|saros|eclipse_type|   gamma|magnitude|lat_ge|lng_ge|lat_dd_ge| lng_dd_ge|sun_alt|sun_azm|path_width|central_duration|duration_secs|cat_no|canon_plate|julian_date|  t0|       x0|       x1|      x2|      x3|        y0|        y1|       y2|      y3|        d0|       d1|     d2|      mu0|     mu1|mu2|     l10|      l11|     l12|      l20|      l21|     l22|   tan_f1|   tan_f2|tmin|tmax|etype|PNS|UNS|NCN|nSer|nSeq|nJLE|
+-----+-----+---+-------------------+-------+--------+

In [11]:
num_rows = filtered_nasa.count()
num_columns = len(filtered_nasa.columns)
print("Shape of the DataFrame: ({}, {})".format(num_rows, num_columns))

Shape of the DataFrame: (11602, 54)


In [12]:
columns_to_keep = [col for col in filtered_nasa.columns if col not in ['td_ge', 'lat_ge', 'lng_ge', 'central_duration', 'eclipse_type']]
filtered_nasa = filtered_nasa[columns_to_keep]

In [13]:
filtered_nasa.dtypes

[('year', 'int'),
 ('month', 'int'),
 ('day', 'int'),
 ('dt', 'double'),
 ('luna_num', 'int'),
 ('saros', 'int'),
 ('gamma', 'double'),
 ('magnitude', 'double'),
 ('lat_dd_ge', 'double'),
 ('lng_dd_ge', 'double'),
 ('sun_alt', 'double'),
 ('sun_azm', 'double'),
 ('path_width', 'double'),
 ('duration_secs', 'double'),
 ('cat_no', 'double'),
 ('canon_plate', 'double'),
 ('julian_date', 'double'),
 ('t0', 'double'),
 ('x0', 'double'),
 ('x1', 'double'),
 ('x2', 'double'),
 ('x3', 'double'),
 ('y0', 'double'),
 ('y1', 'double'),
 ('y2', 'double'),
 ('y3', 'double'),
 ('d0', 'double'),
 ('d1', 'double'),
 ('d2', 'double'),
 ('mu0', 'double'),
 ('mu1', 'double'),
 ('mu2', 'double'),
 ('l10', 'double'),
 ('l11', 'double'),
 ('l12', 'double'),
 ('l20', 'double'),
 ('l21', 'double'),
 ('l22', 'double'),
 ('tan_f1', 'double'),
 ('tan_f2', 'double'),
 ('tmin', 'double'),
 ('tmax', 'double'),
 ('etype', 'int'),
 ('PNS', 'int'),
 ('UNS', 'int'),
 ('NCN', 'int'),
 ('nSer', 'int'),
 ('nSeq', 'int'),


In [14]:
#Feature columns
feature_columns = ['year', 'month', 'day', 'luna_num', 'saros',
                   'gamma', 'magnitude',  'lat_dd_ge','lng_dd_ge', 'sun_alt', 'sun_azm', 
                   'path_width', 'duration_secs', 'cat_no', 'canon_plate', 
                   'julian_date', 't0', 'x0','x1', 'x2', 'x3', 'y0', 'y1', 
                   'y2', 'y3', 'd0', 'd1', 'd2', 'mu0','mu1', 'mu2', 'l10', 'l11', 
                   'l12', 'l20', 'l21', 'l22', 'tan_f1','tan_f2', 'tmin', 'tmax', 'etype', 
                   'PNS', 'UNS', 'NCN', 'nSer', 'nSeq','nJLE']

vectorAssembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

In [27]:
n_features = len(feature_columns)

#n_classes = 3 

max_label_value = y_train.max()
y_train_adjusted = y_train - y_train.min()
n_classes = max_label_value + 1  

In [28]:
#RNN model
model = Sequential([
    LSTM(64, input_shape=(n_features, 1)),
    Dense(64, activation='relu'),
    Dense(n_classes, activation='softmax')
])

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

In [29]:
#Spark DataFrame to Pandas
pandas_df = filtered_nasa.toPandas()
X = pandas_df.drop(columns=['etype']).values  
y = pandas_df['etype'].values 

In [30]:
X = X.reshape(X.shape[0], X.shape[1], 1)

In [31]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

In [32]:
print("Data type of X_train:", X_train.dtype)
print("Data type of y_train:", y_train.dtype)

Data type of X_train: float64
Data type of y_train: int32


In [33]:
model.fit(X_train, y_train, epochs=100, batch_size=32, validation_split=0.2)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

<keras.src.callbacks.History at 0x2b1e2de2a90>

In [34]:
loss, accuracy = model.evaluate(X_test, y_test)
print(f'Test Accuracy: {accuracy}')

Test Accuracy: 0.7713300585746765


In [None]:
spark.stop()

https://www.machinelearningplus.com/pyspark/pyspark-outlier-detection-and-treatment/
https://umbra.nascom.nasa.gov/eclipse/980226/tables/table_1.html
https://eclipse.gsfc.nasa.gov/SEbeselm/SEbeselm2051/SE2076Jan06Tbeselm.html