In [0]:
# Step 1: Import the required libraries and define the API parameters 

import requests
import json
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, round
spark = SparkSession.builder.getOrCreate()
""" 
Define API parameters.
I have used url and params instead of directly using url - https://api.nasa.gov/neo/rest/v1/feed?start_date=2015-09-07&end_date=2015-09-08&api_key=DEMO_KEY for a Easier to read and modify and less prone to error approach. 
"""
url = "https://api.nasa.gov/neo/rest/v1/feed"
params = {
    "start_date": "2015-09-07",
    "end_date": "2015-09-08",
    "api_key": "DEMO_KEY"
}

# Get data
response = requests.get(url, params=params)
data = response.json()


In [0]:
"""
Step 2 : 
Explore the data
the data is a dictionary where:
Keys are date strings and values are lists of near earth object (NEO) dictionaries observed on that date.
"""
print(data["near_earth_objects"].keys())
print((data["near_earth_objects"]["2015-09-07"]))

In [0]:
# Step 3: Extract objs data and transform to a Spark DataFrame

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Extract Objs by date
objs_by_date = data['near_earth_objects']

# Flatten data for simple view and easy to work: create list of all objs
objs_list = []
for date, objs in objs_by_date.items():
    for obj in objs:
        obj['date'] = date  # keeping the date information as it is needed.
        objs_list.append(obj)

# Use pandas to flatten list of dicts and then convert to Spark DataFrame for large dataset/parallel processing and use SQL support
objs_pd_df = pd.json_normalize(objs_list)
objs_df = spark.createDataFrame(objs_pd_df)

# check schema and first value 
objs_df.printSchema() 
print(objs_df.first())

In [0]:
""" 
Step 4: All near earth objects
Object Table (objects) answers - How many asteroid or comet?
Each row will have 1 unique object, regardless of how many times it comes near Earth.
This is a object level data capture.
Create table from Spark DataFrame for All near earth objects (taking only unique objects into consideration).
selecting only the important columns, this can be changed.
"""
objs_flat_df = objs_df.select(
    col("id"),
    col("name"),
    col("nasa_jpl_url"),
    col("absolute_magnitude_h").cast("double"),
    col("is_potentially_hazardous_asteroid").cast("boolean"),
    col("is_sentry_object").cast("boolean"),
    round(col("`estimated_diameter.meters.estimated_diameter_min`").cast("double"),2).alias("estimated_diameter_min_m"),
    round(col("`estimated_diameter.meters.estimated_diameter_max`").cast("double"),2).alias("estimated_diameter_max_m")
)

# Removing duplicate obj ID
objs_distinct_df = objs_flat_df.dropDuplicates(["id"])

objs_distinct_df.show(5)

# Save it to as a SQL table
spark.sql("DROP TABLE IF EXISTS objects")
objs_distinct_df.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("objects")

In [0]:
""" Step 5 : All instances of a close approach
Close Approach Table (obj_close) answeres - When and how close did this obj come to Earth?
Each row will have  1 specific approach. A single obj can have multiple rows (approaches) over time.
This is a event-level data capture"""

close_df = objs_df.select(
    col("id"),
    col("name"),
    explode("close_approach_data").alias("approach")
)
close_df.show(5)

# Create table from Spark DataFrame for All close approach objects and selecting only the important columns, this can be changed.
close_flat_df = close_df.select(
    col("id"),
    col("name"),
    col("approach.close_approach_date"),
    col("approach.close_approach_date_full"),
    col("approach.epoch_date_close_approach"),
    round(col("approach.relative_velocity.kilometers_per_hour").cast("double"), 2).alias("kmph"),
    round(col("approach.miss_distance.kilometers").cast("double"), 2).alias("miss_distance_km"),
    col("approach.orbiting_body")
)

close_flat_df.show(5)

close_flat_df.write.mode("overwrite").saveAsTable("obj_close_approaches")



In [0]:

#Step 6: Analyze the tables
# View first few near earth objects
display(spark.sql("SELECT * FROM objects LIMIT 10"))

# View first few close approach events
display(spark.sql("SELECT * FROM obj_close_approaches LIMIT 10"))

# **USER STORY 1**

The ask -
Build an analysis system that helps amateur astronomers understand near-Earth objects (NEOs)

Requirements         
- Show size metrics, orbital period calculations,
and hazard classifications.
- Show past and future approaches	Show NEOs that came close to Earth in the past 25 years or will in the next 25 years
- The system should categorize objects based on their
hazard potential using established scientific criteria.
- Let users filter/sort	By size, hazard, time, orbiting body, etc.
- Show visual charts	Charts to compare size vs distance, speed, etc.
- Show detailed profiles

## Part 1

In [0]:
%sql
--Show detailed information like - including size metrics, orbital period calculations, and hazard classifications.
--CREATE OR REPLACE TEMP VIEW neo_summary AS
SELECT
  id,
  name,
  ROUND((estimated_diameter_min_m + estimated_diameter_max_m)/2, 4) AS average_diameter_meters,
  absolute_magnitude_h AS magnitude,
  is_sentry_object,
  CASE 
    WHEN is_potentially_hazardous_asteroid THEN 'Hazardous'
    ELSE 'Non-Hazardous'
  END AS hazard_category
FROM objects;

-- Orbital period (usually in days or years) is how long the object takes to orbit the Sun. To find this we can use the formulla T = sqrt(a^3) where T is the orbital period and a is the semi-major axis of the orbit. 


## Part 2

In [0]:
%sql
-- Displaying objects that had close approaches to Earth in the past 25 years or are predicted to approach in the next 25 years.
-- This is done taking today's date and subtracting 25 years and adding 25 years to get the timeframe.
SELECT  
  id,
  name,
  close_approach_date,
  kmph AS speed_kmph,
  miss_distance_km,
  orbiting_body
FROM obj_close_approaches
WHERE 
  to_date(close_approach_date, 'yyyy-MM-dd') BETWEEN date_sub(current_date(), 365 * 25) -- current - 25 years
                                                 AND date_add(current_date(), 365 * 25) -- current + 25 years
ORDER BY close_approach_date;

## Part 3

In [0]:
%sql 
/* 
A NEO is further designated as a potential hazard if it meets two specific criteria:
1. Orbital Intersection Distance: Its orbit intersects Earth's orbit with a minimum orbit intersection distance (MOID) of less than 0.05    astronomical units (AU), which is approximately 4.65 million miles or 7.48 million kilometers. 
2. Absolute Magnitude: Its absolute magnitude (a measure of brightness) is 22 or less, which generally corresponds to a diameter greater than 140 meters (460 feet). 

Since we already have a field is_potentially_hazardous_asteroid; we can use this field for classification.
We can also manually calcuate based on the above two conditions using the close_approach_data.miss_distance.astronomical and absolute_magnitude_h fields using the condition - 
CASE
    WHEN CAST(close_approach_data[0].miss_distance.astronomical AS DOUBLE) <= 0.05
         AND absolute_magnitude_h <= 22 THEN 'Potentially Hazardous'
    ELSE 'Not Hazardous' 
 */

SELECT
  id,
  name,
  CASE
    WHEN is_potentially_hazardous_asteroid THEN 'Hazardous'
    ELSE 'Non-Hazardous'
  END AS hazard_category
FROM objects;


## Part 4 and Part 5

In [0]:
"""
Visualize the data. First we join the two tables using SQL and then convert the df to pandas.

"""
joined_df = spark.sql("""
SELECT * FROM objects o JOIN obj_close_approaches c ON o.id = c.id
""")
joined_pd = joined_df.toPandas()
joined_pd['avg_diameter_m'] = (joined_pd['estimated_diameter_min_m'] + joined_pd['estimated_diameter_max_m']) / 2
joined_pd['approach_day'] = joined_pd['close_approach_date'].str[8:10].astype(int)

display(joined_pd)

plt.figure(figsize=(8,6))
sns.scatterplot(
    data=joined_pd, 
    x='avg_diameter_m', 
    y='miss_distance_km',
)
plt.title('Size vs. Approach Distance')
plt.xlabel('Estimated Diameter (meters)')
plt.ylabel('Miss Distance (km)')
plt.show()

plt.figure(figsize=(10,6))
sns.histplot(joined_pd['approach_day'], discrete=True) 
plt.title('Number of Close Approaches per day')
plt.xlabel('Day')
plt.ylabel('Number of Approaches')
plt.xticks(range(1,32))
plt.grid(axis='y')
plt.show()




Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
def neo_info (obj_id : str):
    details = spark.sql(f"""
    SELECT * FROM objects o JOIN obj_close_approaches c ON o.id = c.id
    where o.id = '{obj_id}'
    ORDER BY c.close_approach_date
    """)
    return details.toPandas()

neo_info("3426410")

# USER STORY 2

%md
The ask -
Analysis of NEO close approaches frequency

Requirements         
- Statistical analysis of NEO close approaches in a time period
- Filter by object size ranges and show frequency distribution
- Categorize objects by minimum distance during close approach
- Identify and analyze "potentially hazardous" objects separately
- Visualize trends and comparative analysis (hazardous vs non-hazardous)

## Part 1 to 5
The system should provide statistical analysis of near-Earth
object (NEO) close approaches over a defined time period.

When filtering by object size ranges, the system should
show frequency distribution accordingly.

The system should categorize objects by their minimum
distance from Earth during close approaches.

The system should clearly identify which objects are
classified as "potentially hazardous" and analyse their
frequency separately.

The system should provide comparative analysis between
potentially hazardous and non-hazardous objects.


In [0]:
"""
 First we join the two tables using SQL and then convert the df to pandas. Since it is already done we directly use the DF
"""
print("Total Close Approaches:", len(joined_pd))
print("Unique NEOs:", joined_pd['id'].nunique())
print("Hazardous NEOs:", joined_pd['is_potentially_hazardous_asteroid'].sum())
print("Non-Hazardous NEOs:",(joined_pd['is_potentially_hazardous_asteroid'] == False).sum())
print("Average Diameter (meters):", joined_pd['avg_diameter_m'].mean())
print("Average speed (kmph):", joined_pd['kmph'].mean())
print("Average magnitute (h):", joined_pd['absolute_magnitude_h'].mean())
print("Date Range:", joined_pd['close_approach_date'].min(), "to", joined_pd['close_approach_date'].max())

display(joined_pd)


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.