# ETL Pipeline

CSV Source: https://www.kaggle.com/datasets/taweilo/washington-dc-historical-weather-20158202407

In [11]:
#API side of ETL DATA PROCESSOR PIPELINE
import pandas as pd
import json
import sqlite3
import requests

API_KEY = "blank" # api key for weatherstack api
BASE_URL = "http://api.weatherstack.com/current" # base url for api endpoint
location = "Washington, DC"

def fetch_weather_data(location): # fetches weather data from api endpoint
    params = {
        'access_key': API_KEY,
        'query': location
    }
    response = requests.get(BASE_URL, params=params) #got the response from the api request
    
    if response.status_code == 200:
        data = response.json()
        if 'error' in data: # if an error were to occur within the json it would display here
            raise Exception(f"API error: {data['error']['info']}")
        return data
    else:
        raise Exception(f"Failed to fetch data. Status code: {response.status_code}") # status code for the api request

def process_weather_data(data): # processes weather data from api and pulls certain column's data
    location_data = data['location']
    current_data = data['current']
    
    processed_data = { # created a pandas dataframe with the selected variables for analysis
        'location_name': location_data['name'],
        'region': location_data['region'],
        'country': location_data['country'],
        'localtime': location_data['localtime'],
        'temperature': current_data['temperature'],
        'humidity': current_data['humidity'],
        'weather_descriptions': current_data['weather_descriptions'][0],
        'wind_speed': current_data['wind_speed'],
        'precipitation': current_data['precip'],
        'uv_index': current_data['uv_index'],
        'visibility': current_data['visibility']
    }
    
    df = pd.DataFrame([processed_data])
    print("API Data was fetched.")
    generate_summary(df)
    return df

def modify_weather_data(df):  # removes precipitation and visibility
    df_modified = df.drop(columns=['precipitation', 'visibility']) # utilized pandas drop column function to drop unwanted columns from final data
    print("API Data was modified.")
    generate_summary(df_modified)
    return df_modified


def convert_data(input_data, input_format, output_format, file_name): #allows conversion from json to csv and csv to json
    if input_format == 'csv' and output_format == 'json': # csv to json 
        data = input_data.to_json(f'{file_name}.json', orient='records')
        print(f"Data converted from CSV to JSON and saved as {file_name}.json")
    
    elif input_format == 'json' and output_format == 'csv': # json to csv
        data = pd.json_normalize(input_data)
        data.to_csv(f"{file_name}.csv", index=False)
        print(f"Data converted from JSON to CSV and saved as {file_name}.csv")
    else:
        raise ValueError(f"Unsupported conversion from {input_format} to {output_format}") 
        
def save_to_sql(data, table_name): #data is saved to sql database
    db_name = "avw2at.db"
    try:
        conn = sqlite3.connect(db_name)
        data.to_sql(table_name, conn, if_exists='replace', index=False) #created tables for the connection and saved the data
        conn.close()
        print(f"Data saved to {table_name} table in {db_name}")
    except sqlite3.DatabaseError as e:
        raise RuntimeError(f"Error saving data to SQLite: {e}")

def generate_summary(df): #summary of records and columns
    num_records = df.shape[0]
    num_columns = df.shape[1]
    print(f"Summary: {num_records} record(s) with {num_columns} column(s)")
    print("SUMMARY:\n", df.head())

def main():
    try:
        # Step 1: data is fetched
        raw_api_data = fetch_weather_data(location)
        processed_api_data = process_weather_data(raw_api_data)
        # Step 2 : data is modified
        modified_api_data = modify_weather_data(processed_api_data)
        # Step 3: data is converted
        convert_data(modified_api_data, 'json', 'csv', 'api_weather_converted')
        # Step 4: data is saved
        save_to_sql(modified_api_data, "weather")


    except Exception as e:
        print(f"An error occurred: {e}")

main()

API Data was fetched.
Summary: 1 record(s) with 11 column(s)
SUMMARY:
   location_name                region                   country  \
0    Washington  District of Columbia  United States of America   

          localtime  temperature  humidity weather_descriptions  wind_speed  \
0  2024-10-18 14:59           20        30        Partly cloudy          10   

   precipitation  uv_index  visibility  
0              0         4          16  
API Data was modified.
Summary: 1 record(s) with 9 column(s)
SUMMARY:
   location_name                region                   country  \
0    Washington  District of Columbia  United States of America   

          localtime  temperature  humidity weather_descriptions  wind_speed  \
0  2024-10-18 14:59           20        30        Partly cloudy          10   

   uv_index  
0         4  
Data converted from JSON to CSV and saved as api_weather_converted.csv
Data saved to weather table in avw2at.db


In [9]:
# CSV SIDE OF ETL DATA PROCCESSING PIPELINE
import pandas as pd
import os
import json
import sqlite3
import requests

def fetch_data_from_csv(file): #fetches data from csv file
    try:
        df = pd.read_csv(file) # read the data from the csv
        print(f"Fetched {len(df)} records from {file}")
        print("CSV Data was fetched.")
        summary(df)
        return df
    except Exception as e: # threw an error if there was an issue with the fetch stage
        raise RuntimeError(f"Error fetching data from CSV: {e}")

def convert_data(input_data, input_format, output_format, file_name): # converts the data from csv to jso and json to csv
    if input_format == 'csv' and output_format == 'json': # csv to json
        data = input_data.to_json(f'{file_name}.json', orient='records')
        print(f"Data converted from CSV to JSON and saved as {file_name}.json")
    
    elif input_format == 'json' and output_format == 'csv': # json to csv
        data = pd.json_normalize(input_data)
        data.to_csv(f"{file_name}.csv", index=False)
        print(f"Data converted from JSON to CSV and saved as {file_name}.csv")
    
    else:
        raise ValueError(f"Unsupported conversion from {input_format} to {output_format}")


def modify_csv_data(data): #modified csv data to only contain select columns
    try:
        columns = [ # selected columns for analysis 
            "name", 
            "datetime", 
            "temp", 
            "humidity", 
            "precip", 
            "snow", 
            "windspeed", 
            "uvindex", 
            "description"
        ]
        modified_data = data[columns]
        print("CSV Data was modified.")
        summary(modified_data)
        return modified_data
    except Exception as e:
        raise RuntimeError(f"Error modifying CSV data: {e}")

def save_to_sql(data, table_name): #data is saved to sql database
    db_name = "avw2at.db"
    try:
        conn = sqlite3.connect(db_name)
        data.to_sql(table_name, conn, if_exists='replace', index=False) # created tables and saved the data
        conn.close()
        print(f"Data saved to {table_name} table in {db_name}")
    except sqlite3.DatabaseError as e:
        raise RuntimeError(f"Error saving data to SQLite: {e}")

def summary(data): #summary that is called at every step for data file ingestion and post processing
    num_records = data.shape[0]
    num_columns = data.shape[1]
    print(f"Summary: {num_records} record(s) with {num_columns} column(s)")
    print("SUMMARY:\n", data.head())

def main():
    try:
        # Step 1: fetch the data
        current_dir = os.getcwd()  # Get the current working directory
        csv_file = os.path.join(current_dir, "dc_weather.csv")
        csv_data = fetch_data_from_csv(csv_file)
        # Step 2: modify the data
        columns = [
            "name", "datetime", "temp", "humidity", "precip", "snow", "windspeed", "uvindex", "description"
        ]
        modified_csv_data = modify_csv_data(csv_data, columns)
        # Step 3: convert the data
        convert_data(modified_csv_data, 'csv', 'json', 'csv_weather_converted')
        # Step 4: save to database
        save_to_sql(modified_csv_data, "weather")

    except Exception as e:
        print(f"Error occurred: {e}")
main()

Fetched 3319 records from /Users/sophiazhang/Downloads/DS 2002/dc_weather.csv
CSV Data was fetched.
Summary: 3319 record(s) with 33 column(s)
SUMMARY:
                 name    datetime  tempmax  tempmin  temp  feelslikemax  \
0  Washington,DC,USA  2015-08-01     33.1     22.8  28.2          34.0   
1  Washington,DC,USA  2015-08-02     32.0     22.8  27.3          31.1   
2  Washington,DC,USA  2015-08-03     33.2     21.8  27.9          34.9   
3  Washington,DC,USA  2015-08-04     35.3     24.9  29.3          36.6   
4  Washington,DC,USA  2015-08-05     33.6     24.0  28.6          33.0   

   feelslikemin  feelslike   dew  humidity  ...  solarenergy  uvindex  \
0          22.8       28.6  17.4      53.3  ...         22.9        8   
1          22.8       27.2  15.4      49.1  ...         22.4        8   
2          21.8       28.7  18.6      58.7  ...         24.1        9   
3          24.9       30.4  19.0      55.2  ...         21.9        9   
4          24.0       28.6  17.6      