In [2]:
import requests
import pandas as pd
import sqlite3
import schedule
import time

# Extract: Get weather data from OpenWeather API
def extract_weather_data():
    api_key = 'c8f939fef8908e69e9d56c21ab1fa5ad'

    city = 'London'  # City to get weather data for
    url = f'https://api.openweathermap.org/data/2.5/weather?q=London&appid=c8f939fef8908e69e9d56c21ab1fa5ad'


    try:
        response = requests.get(url)
        data = response.json()

        if response.status_code == 200:
            # Only extract relevant fields
            weather_data = {
                'city': data['name'],
                'temperature': data['main']['temp'],
                'humidity': data['main']['humidity'],
                'weather': data['weather'][0]['description'],
                'timestamp': pd.to_datetime('now')
            }
            print("Data extracted successfully.")
            return weather_data
        else:
            print(f"Error fetching data: {data['message']}")
            return None
    except Exception as e:
        print(f"Error in extraction: {e}")
        return None

In [3]:
# Transform: Clean and transform the weather data
def transform_data(weather_data):
    """
    Transforms the raw weather data with complex transformations.
    """

    # Convert temperature from Kelvin to Celsius
    weather_data['temperature'] = weather_data['temperature'] - 273.15

    # Add 'Feels Like' temperature (simple approximation based on actual temperature and humidity)
    weather_data['feels_like'] = weather_data['temperature'] - ((100 - weather_data['humidity']) / 5)

    # Categorize humidity levels into 'Low', 'Medium', 'High'
    def categorize_humidity(humidity):
        if humidity < 30:
            return 'Low'
        elif 30 <= humidity <= 60:
            return 'Medium'
        else:
            return 'High'

    # Apply categorization directly to the single humidity value
    weather_data['humidity_category'] = categorize_humidity(weather_data['humidity'])

    # Categorize weather conditions into custom labels (simplified)
    def categorize_weather(description):
        if 'clear' in description:
            return 'Clear'
        elif 'clouds' in description:
            return 'Cloudy'
        elif 'rain' in description or 'drizzle' in description:
            return 'Rainy'
        elif 'snow' in description:
            return 'Snowy'
        elif 'thunderstorm' in description:
            return 'Thunderstorm'
        else:
            return 'Other'

    # Directly call categorize_weather on the string (no apply needed)
    weather_data['weather_category'] = categorize_weather(weather_data['weather'])

    # Add weather insights: 'Cold', 'Warm', 'Hot' based on temperature
    def temperature_insight(temp):
        if temp < 0:
            return 'Cold'
        elif 0 <= temp <= 20:
            return 'Warm'
        else:
            return 'Hot'

    # Apply temperature insight directly to the temperature
    weather_data['temperature_insight'] = temperature_insight(weather_data['temperature'])

    # Adding a simple date feature: Day of the week (adjusted for single timestamp)
    weather_data['day_of_week'] = weather_data['timestamp'].strftime('%A')  # Directly extract day name from timestamp

    # Return the transformed data
    print("Data transformed successfully.")
    return weather_data

In [4]:
def load_data(weather_data):
    if weather_data:
        # Connect to SQLite database (or create it if it doesn't exist)
        conn = sqlite3.connect('weather_data.db')
        cursor = conn.cursor()

        # Create a table if it doesn't exist
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS weather (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                city TEXT,
                temperature REAL,
                humidity INTEGER,
                weather TEXT,
                timestamp DATETIME
            )
        ''')
    
        weather_data['timestamp'] = weather_data['timestamp'].strftime('%Y-%m-%d %H:%M:%S')

        # Insert the data into the table
        cursor.execute('''
            INSERT INTO weather (city, temperature, humidity, weather, timestamp)
            VALUES (?, ?, ?, ?, ?)
        ''', (weather_data['city'], weather_data['temperature'], weather_data['humidity'], weather_data['weather'], weather_data['timestamp']))

        # Commit the transaction and close the connection
        conn.commit()
        conn.close()

        print(f"Data loaded into the database: {weather_data['city']}")
    else:
        print("No data to load.")

In [5]:
def etl_pipeline():
    weather_data = extract_weather_data()
    if weather_data:
        transformed_data = transform_data(weather_data)
        load_data(transformed_data)
    else:
        print("No data to process.")

In [6]:
etl_pipeline()

Data extracted successfully.
Data transformed successfully.
Data loaded into the database: London


  'timestamp': pd.to_datetime('now')


In [7]:
def schedule_pipeline():
    try:
        schedule.every(1).hour.do(etl_pipeline)
        print("Pipeline scheduled to run every hour.")
    except Exception as e:
        logging.error(f"Error scheduling pipeline: {e}")
        print(f"Error scheduling pipeline: {e}")

In [8]:
# Run the scheduler
if __name__ == "__main__":
    try:
        schedule_pipeline()  # Schedule the task
        while True:
            schedule.run_pending()
            time.sleep(60)  # Wait 1 minute before checking again
    except KeyboardInterrupt:
        print("Scheduler stopped manually.")
    except Exception as e:
        logging.error(f"Unexpected error in scheduler loop: {e}")
        print(f"Unexpected error in scheduler loop: {e}")

Pipeline scheduled to run every hour.
Scheduler stopped manually.
