<span style="color:red; font-family:Helvetica Neue, Helvetica, Arial, sans-serif; font-size:2em;">An Exception was encountered at '<a href="#papermill-error-cell">In [4]</a>'.</span>

# 🌍 Global Air Quality Tracker

This project uses the IQAir API to fetch real-time air quality data from global cities, stores the data in MongoDB, and prepares it for visualization in Power BI or Tableau.

**ETL Flow:**

1. Extract air quality data from IQAir API
2. Transform & normalize the data
3. Load into MongoDB
4. Visualize on Power BI or Tableau Public

Load Environment Variables and Required Libraries

We import essential libraries to interact with the IQAir API and manage credentials securely.

- `os` and `dotenv`: Load environment variables from a `.env` file so sensitive information like API keys is not hardcoded.
- `requests`: Used to make HTTP requests to the IQAir API.

In [1]:
import os
from dotenv import load_dotenv
import requests
import pymongo
import pandas as pd
import time

In [2]:
load_dotenv()
API_KEY = os.getenv("API_KEY")
BASE_URL = os.getenv("BASE_URL")
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/")
client = pymongo.MongoClient(MONGO_URI)
db = client[os.getenv("MONGO_DB_NAME")]

Populating Array with Cities Reuqired for Report

In [3]:
# Fetching the list of German cities and their states
country = "Germany"
collection = db["germany_cities_states"]


<span id="papermill-error-cell" style="color:red; font-family:Helvetica Neue, Helvetica, Arial, sans-serif; font-size:2em;">Execution using papermill encountered an exception here and stopped:</span>

In [4]:
# Step 1: Get all states in Germany
states_url = f"{BASE_URL}/v2/states?country={country}&key={API_KEY}"
states_response = requests.get(states_url)
print(states_response)
states_data = states_response.json()

states = [state['state'] for state in states_data['data']]

<Response [429]>


TypeError: string indices must be integers, not 'str'

In [None]:
# Step 2: For each state, get all cities (with rate limiting)
all_cities = []
for state in states:
    cities_url = f"{BASE_URL}/v2/cities?state={state}&country={country}&key={API_KEY}"
    cities_response = requests.get(cities_url)
    print(cities_response)
    if cities_response.status_code == 429:
        print(f"Rate limit hit for state {state}. Waiting 60 seconds...")
        time.sleep(60)
        cities_response = requests.get(cities_url)
    cities_data = cities_response.json()
    cities = [city['city'] for city in cities_data['data']]
    for city in cities:
        all_cities.append({"country": country, "state": state, "city": city})

In [None]:
# Step 3: Store in MongoDB
if all_cities:
    collection.delete_many({})
    collection.insert_many(all_cities)
    print(f"Inserted {len(all_cities)} city-state pairs into MongoDB.")
else:
    print("No cities found.")

In [None]:
# # Extract air quality data for all German cities and store in MongoDB

# source_collection = db["germany_cities_states"]
# target_collection = db["germany_city_air_quality"]

# city_docs = list(source_collection.find({}))

# for doc in city_docs:
#     city = doc["city"]
#     state = doc["state"]
#     country = doc["country"]
#     url = f"{BASE_URL}/v2/city?city={city}&state={state}&country={country}&key={API_KEY}"

#     response = requests.get(url)
#     if response.status_code == 429:
#         print(f"Rate limit hit for {city}, {state}. Waiting 60 seconds...")
#         time.sleep(60)
#         response = requests.get(url)

#     if response.status_code == 200:
#         data = response.json()
#         target_collection.update_one(
#             {"city": city, "state": state, "country": country},
#             {"$set": {"data": data}},
#             upsert=True
#         )
#         print(f"Stored air quality for {city}, {state}")
#     else:
#         print(f"Failed for {city}, {state}: {response.status_code}")

In [None]:
# # Fetch all records from MongoDB
# records = list(db["germany_city_air_quality"].find({}))

# # Normalize nested fields for DataFrame
# def extract_current_fields(doc):
#     try:
#         current = doc.get("data", {}).get("data", {}).get("current", {})
#         weather = current.get("weather", {})
#         pollution = current.get("pollution", {})
#         return {
#             "city": doc.get("city"),
#             "state": doc.get("state"),
#             "country": doc.get("country"),
#             **{f"weather_{k}": weather.get(k) for k in ["ts", "ic", "hu", "pr", "tp", "wd", "ws"]},
#             **{f"pollution_{k}": pollution.get(k) for k in ["ts", "aqius", "mainus", "aqicn", "maincn"]}
#         }
#     except Exception as e:
#         return {"error": str(e)}

# df = pd.DataFrame([extract_current_fields(doc) for doc in records])

# # Check for missing values in weather and pollution columns
# weather_cols = [col for col in df.columns if col.startswith("weather_")]
# pollution_cols = [col for col in df.columns if col.startswith("pollution_")]

# print("Missing values in weather columns:")
# print(df[weather_cols].isnull().sum())

# print("\nMissing values in pollution columns:")
# print(df[pollution_cols].isnull().sum())

# # Show rows with any missing weather or pollution data
# missing = df[df[weather_cols + pollution_cols].isnull().any(axis=1)]
# print(f"\nRows with missing or inconsistent data: {len(missing)}")
# display(missing.head())

# cleaned_df = df.fillna({col: "MISSING" for col in weather_cols + pollution_cols})

# # Convert DataFrame records to dictionaries for MongoDB
# cleaned_records = cleaned_df.to_dict(orient="records")

# # Insert into a new MongoDB collection
# cleaned_collection = db["germany_city_air_quality_cleaned"]
# cleaned_collection.delete_many({})  # Clear previous data if any
# if cleaned_records:
#     cleaned_collection.insert_many(cleaned_records)
#     print(f"Inserted {len(cleaned_records)} cleaned records into 'germany_city_air_quality_cleaned'.")
# else:
#     print("No cleaned records to insert.")