### OPEN AQ DATA

In [0]:
import requests
import pandas as pd
from pyspark.sql import SparkSession

API_KEY = "beb23f6e525519e691638a5ff7f93fbb2929ea03bb3aeb824eed3b83485cf4f3"
headers = {"X-API-Key": API_KEY}

# Step 1: Get all locations in India (change country if needed)
loc_url = "https://api.openaq.org/v3/locations"
params = {"country": "IN", "limit": 50}   # fetch up to 50 locations
resp = requests.get(loc_url, params=params, headers=headers)

if resp.status_code != 200:
    raise ValueError(f"Error fetching locations: {resp.status_code}, {resp.text}")

locs = resp.json()

all_records = []

# Step 2: Loop through location IDs
for l in locs.get("results", []):
    location_id = l["id"]
    location_name = l.get("name")

    latest_url = f"https://api.openaq.org/v3/locations/{location_id}/latest"
    latest_resp = requests.get(latest_url, headers=headers)

    if latest_resp.status_code != 200:
        print(f"⚠️ Skipping {location_id} ({location_name}) due to error {latest_resp.status_code}")
        continue

    latest_json = latest_resp.json()

    if "results" not in latest_json or len(latest_json["results"]) == 0:
        continue

    for item in latest_json["results"]:
        rec = {
            "location_id": location_id,
            "location_name": location_name,
            "parameter": item.get("parameter"),
            "value": item.get("value"),
            "unit": item.get("unit"),
            "datetime_utc": item.get("date", {}).get("utc"),
            "datetime_local": item.get("date", {}).get("local"),
            "lat": item.get("coordinates", {}).get("latitude"),
            "lon": item.get("coordinates", {}).get("longitude"),
            "country": l.get("country"),
            "city": l.get("city")
        }
        all_records.append(rec)

# Step 3: Display data
if all_records:
    pdf = pd.DataFrame(all_records)
    df = spark.createDataFrame(pdf)
    print("✅ Current Data Fetched:")
    df.display()  # show up to 50 rows
else:
    pass


### NYC DATA

In [0]:
import requests
import pandas as pd

# NYC 311 API - get the latest 1000 requests
url = "https://data.cityofnewyork.us/resource/erm2-nwe9.json?$limit=1000&$order=created_date DESC"

# Fetch JSON
response = requests.get(url)
data = response.json()

# Convert to Pandas first
pdf = pd.DataFrame(data)

# Convert Pandas → Spark DataFrame
df = spark.createDataFrame(pdf)

# Show sample data
df.display()


### USGS DATA

In [0]:
import requests
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# USGS API - all earthquakes in the past day
url = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_day.geojson"

# Fetch JSON
response = requests.get(url)
data = response.json()

# The earthquake data is under 'features'
earthquakes = data['features']

# Flatten each feature (properties + geometry)
records = []
for eq in earthquakes:
    rec = {}
    # Copy all properties
    rec.update(eq['properties'])
    
    # Add geometry fields
    geom = eq.get('geometry', {})
    coords = geom.get('coordinates', [None, None, None])  # [lon, lat, depth]
    rec['longitude'] = coords[0]
    rec['latitude'] = coords[1]
    rec['depth'] = coords[2]
    
    # Add ID
    rec['id'] = eq.get('id')
    
    records.append(rec)

# Convert to Pandas DataFrame
pdf = pd.DataFrame(records)

# Convert to Spark DataFrame
df = spark.createDataFrame(pdf)

# Show all columns and sample rows
df.display()


### NEWYOUR TRAFFIC DATA

In [0]:
import requests
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

spark = SparkSession.builder.getOrCreate()

# --- Fetch live Chicago traffic congestion data ---
url = "https://data.cityofnewyork.us/resource/5uac-w243.json?$limit=1000"
response = requests.get(url)
data = response.json()

# Check if data is returned
if len(data) == 0:
    print("No data returned from the API!")
else:
    # --- Convert to Pandas DataFrame ---
    pdf = pd.DataFrame(data)

    # --- Convert to Spark DataFrame ---
    df = spark.createDataFrame(pdf)

    # --- Add ingestion timestamp ---
    df = df.withColumn("ingest_time", current_timestamp())

    # --- Show sample rows and schema ---
    df.display()


### DOT Traffic Speeds NBE Updates every minute


In [0]:
import requests
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

# Spark session
spark = SparkSession.builder.getOrCreate()

# --- Step 1: Fetch DOT Traffic Speeds NBE ---
url = "https://data.cityofnewyork.us/resource/i4gi-tjb9.json?$limit=1000"
response = requests.get(url)
data = response.json()

# --- Step 2: Convert to Pandas DataFrame ---
pdf = pd.DataFrame(data)

# --- Step 3: Convert to Spark DataFrame ---
df = spark.createDataFrame(pdf)

# --- Step 5: Show sample rows and schema ---
df.display()

In [0]:
import requests
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

# Spark session
spark = SparkSession.builder.getOrCreate()

# --- Step 1: Fetch DOT Traffic Speeds NBE ---
url = "https://data.cityofnewyork.us/resource/n6c5-95xh.json?$limit=1000"
response = requests.get(url)
data = response.json()

# --- Step 2: Convert to Pandas DataFrame ---
pdf = pd.DataFrame(data)

# --- Step 3: Convert to Spark DataFrame ---
df = spark.createDataFrame(pdf)

# --- Step 5: Show sample rows and schema ---
df.display()