In [0]:
import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
from pyspark.sql.types import StructType, StructField, StringType, BooleanType
from pyspark.sql.functions import col, when, avg, round

import pandas as pd
import time



In [0]:
spark = SparkSession.builder.getOrCreate()

# --- Step 1: Extract ---
def fetch_data(url, headers=None, retries=3, backoff=2):
    for attempt in range(retries):
        try:
            resp = requests.get(url, headers=headers, timeout=10)
            resp.raise_for_status()
            return resp.json()
        except Exception as e:
            print(f"Attempt {attempt+1} failed: {e}")
            import time; time.sleep(backoff ** (attempt+1))
    return None

In [0]:
region = "US-CA"
url = f"https://api.ebird.org/v2/data/obs/{region}/recent"

# You may need an API key in header
headers = {"X-eBirdApiToken": "x"}

data = fetch_data(url, headers=headers)
if not data:
    raise Exception("Failed to fetch data from eBird API")

In [0]:
pdf = pd.json_normalize(data)
# Inspect what columns are present
print(pdf.columns)

df_obs = pdf[[
    'speciesCode',
    'comName',
    'sciName',
    'obsDt',
    'howMany',
    'lat',
    'lng',
    'locName'
]].drop_duplicates()

# Convert to Spark
spark_df = spark.createDataFrame(df_obs)

Index(['speciesCode', 'comName', 'sciName', 'locId', 'locName', 'obsDt',
       'howMany', 'lat', 'lng', 'obsValid', 'obsReviewed', 'locationPrivate',
       'subId', 'exoticCategory'],
      dtype='object')


In [0]:
spark_df = spark_df.withColumn("observation_date", col("obsDt")) \
                   .withColumn("count", col("howMany").cast("integer")) \
                   .withColumn("latitude", col("lat").cast("double")) \
                   .withColumn("longitude", col("lng").cast("double")) \
                   .withColumn("common_name", col("comName")) \
                   .withColumn("scientific_name", col("sciName")) \
                   .withColumn("location_name", col("locName")) \
                   .select(
                        "speciesCode",
                        "common_name",
                        "scientific_name",
                        "observation_date",
                        "count",
                        "latitude",
                        "longitude",
                        "location_name"
                   )

In [0]:
spark.sql("""
CREATE SCHEMA IF NOT EXISTS ebird_db
COMMENT 'Database for eBird observations'
""")

# Write table
spark_df.write.format("delta").mode("overwrite").saveAsTable("ebird_db.ebird_recent_observations")

print("ETL for eBird API complete.")

ETL for eBird API complete.


In [0]:
from pyspark.sql.functions import col, count as f_count, avg, to_date

# Load raw observations
df = spark.table("ebird_db.ebird_recent_observations")

# Ensure proper types
df = df.withColumn("observation_date", to_date(col("observation_date")))

# --- Aggregation 1: Most common species observed ---
species_count = (
    df.groupBy("common_name", "scientific_name")
      .agg(f_count("*").alias("num_observations"),
           avg("count").alias("avg_count"))
      .orderBy(col("num_observations").desc())
)

species_count.write.format("delta").mode("overwrite") \
    .saveAsTable("ebird_db.analytics_species_count")

# --- Aggregation 2: Observations by date ---
observations_by_date = (
    df.groupBy("observation_date")
      .agg(f_count("*").alias("num_observations"))
      .orderBy("observation_date")
)

observations_by_date.write.format("delta").mode("overwrite") \
    .saveAsTable("ebird_db.analytics_observations_by_date")

# --- Aggregation 3: Observations by location ---
observations_by_location = (
    df.groupBy("location_name")
      .agg(f_count("*").alias("num_observations"))
      .orderBy(col("num_observations").desc())
)

observations_by_location.write.format("delta").mode("overwrite") \
    .saveAsTable("ebird_db.analytics_observations_by_location")
