# Script for the data pipeline process

## Importing `libraries`

In [1]:
import pandas as pd
import requests
from bs4 import BeautifulSoup
import re
import json
from datetime import datetime, timedelta, date
import pytz
from pytz import timezone
import sqlalchemy
import os

## Setting `timezone`

In [3]:
local_timezone = pytz.timezone("Europe/Berlin")

## Setting up `python2sql connection`

In [20]:
# setting sqlalchemy variables
schema="Gans" 
host="localhost"
user="root"
password = os.getenv("SQL_PW")
port=3306
con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'

## `python2sql connection` to the `AWS Cloud Service`

In [4]:
# setting sqlalchemy variables
schema="gans"
host="wbs-project3-db.cbbipnfmzuch.eu-central-1.rds.amazonaws.com"
user="admin"
password="awspw123"
port=3306
con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'

## Prompt which `cities` you want to investigate

In [5]:
list_of_cities = []  # Create an empty list to store the cities

while True:
    city = input("Enter a city (or type 'quit' to exit): ")

    if city.lower() == "quit":
        break  # Exit the loop if the user types 'quit'

    list_of_cities.append(city)
list_of_cities

['Berlin', 'Hamburg']

## Defining a function to create a `cities_df`

In [6]:
def get_cities(city_list):
    list_for_city_df = []
    for city in city_list:
        url = "https://api.api-ninjas.com/v1/city?name={}".format(city)
        response = requests.get(
            url, headers={"X-Api-Key": os.getenv("X_API_KEY")}
        )
        if response.status_code == requests.codes.ok:
            cities_df = pd.DataFrame(response.json())
            cities_df.rename(
                columns={"name": "city_name", "country": "country_code"}, inplace=True
            )
            cities_df["timestamp_population"] = datetime.now(local_timezone).year
            list_for_city_df.append(cities_df)
        else:
            print("Error:", response.status_code, response.text)
    
    cities_df = pd.concat(list_for_city_df, ignore_index=True)
    return cities_df

## Pushing the `cities_df` to `cities` table in SQL

In [7]:
get_cities(list_of_cities)[["city_name", "country_code", "latitude", "longitude"]].to_sql('cities', if_exists='append', con=con, index=False)

2

In [17]:
cities_sql = pd.read_sql_table("cities", con=con)
cities_sql

Unnamed: 0,city_id,city_name,country_code,latitude,longitude
0,1,Berlin,DE,52.5167,13.3833
1,2,Hamburg,DE,53.55,10.0


## Pushing parts of `cities_df` to `populations` table in SQL

In [21]:
get_cities(list_of_cities).merge(cities_sql, on="city_name")[["city_id", "city_name", "population", "timestamp_population"]].to_sql('populations', if_exists='append', con=con, index=False)

2

In [23]:
popolations_sql = pd.read_sql_table("populations", con = con)
popolations_sql

Unnamed: 0,city_id,city_name,population,timestamp_population
0,1,Berlin,3644826,2023
1,2,Hamburg,1841179,2023


## Defining a function to create an `airports_df`

In [9]:
def get_airports(city_list):

    latitudes = []
    longitudes = []

    cities_sql = pd.read_sql_table("cities", con=con)

    for city in city_list:
        latitudes.append(cities_sql.loc[cities_sql["city_name"]==city]["latitude"].item())
        longitudes.append(cities_sql.loc[cities_sql["city_name"]==city]["longitude"].item())

    airports_list = []

    for latitudes, longitudes in zip(latitudes, longitudes):
        url = "https://aerodatabox.p.rapidapi.com/airports/search/location"

        querystring = {
            "lat": f"{latitudes}",
            "lon": f"{longitudes}",
            "radiusKm": "50",
            "limit": "7",
            "withFlightInfoOnly": "true",
        }

        headers = {
            "X-RapidAPI-Key": os.getenv("RAPID_API_KEY"),
            "X-RapidAPI-Host": "aerodatabox.p.rapidapi.com",
        }

        response = requests.get(url, headers=headers, params=querystring)

        airports_list.append(pd.json_normalize(response.json()["items"]))
        airports_df = pd.concat(airports_list, ignore_index=True)

        airports_df = airports_df.rename(columns={"municipalityName": "city_name", "icao": "airport_icao",
                            "name": "airport_name"})

    return airports_df

## Pushing the `airports_df` to `airports` table in SQL

In [10]:
get_airports(list_of_cities)[["city_name", "airport_icao", "airport_name"]].to_sql("airports", if_exists="append", con=con, index=False)

2

In [11]:
airports_sql = pd.read_sql_table("airports", con=con)
airports_sql

Unnamed: 0,city_name,airport_icao,airport_name
0,Berlin,EDDB,Berlin Brandenburg
1,Hamburg,EDDH,Hamburg


## Defining a function to create a `weathers_df`

In [12]:
## Weathers dataframe (through API call)
def get_weathers(city_list):

    # Getting OpenWeather key from .env-file
    API_key = os.getenv("OPEN_WEATHER_KEY")

    # Initializing emptly lists that will be filled with weather info per city and later be used to create a dataframe
    countries = []
    cities= []
    times = []
    temps = []
    temps_feel = []
    temps_min = []
    temps_max = []
    humidities = []
    weather_mains = [] 
    weather_descriptions = []
    wind_speeds = []
    visabilities = []
    rain_probability = []
    city_ids = []
    
    # Reading in the city infos from the SQL database table "cities"
    cities_sql = pd.read_sql_table("cities", con=con)
    
    # Iterating through the list of cities to collect the weather data from
    for city in city_list:
        city_name = city

        # Sending a GET request on the API call url
        weather = requests.get(f"https://api.openweathermap.org/data/2.5/forecast?q={city_name}&appid={API_key}&units=metric")
        # Using json()-method to access the pulled data in JSON format
        weather_json = weather.json()

        # Going through the different levels and directories of the JSON-file to collect desired info, iterating over the cities
        for i in range(len(weather_json["list"])):
            countries.append(weather_json["city"]["country"])
            cities.append(weather_json["city"]["name"])
            times.append(weather_json["list"][i]["dt_txt"])
            temps.append(weather_json["list"][i]["main"]["temp"])
            temps_feel.append(weather_json["list"][i]["main"]["feels_like"])
            temps_min.append(weather_json["list"][i]["main"]["temp_min"])
            temps_max.append(weather_json["list"][i]["main"]["temp_max"])
            humidities.append(weather_json["list"][i]["main"]["humidity"])
            weather_mains.append(weather_json["list"][i]["weather"][0]["main"])
            weather_descriptions.append(weather_json["list"][i]["weather"][0]["description"])
            wind_speeds.append(weather_json["list"][i]["wind"]["speed"])
            visabilities.append(weather_json["list"][i]["visibility"])
            rain_probability.append(weather_json["list"][i]["pop"])
            city_ids.append(cities_sql.loc[cities_sql["city_name"]==city]["city_id"].item())

    # Putting together the dataframe consisting of a dictionary with lists in it
    weathers_df = pd.DataFrame({"city_id": city_ids,
                            "forecast_time": times,
                            "outlook": weather_descriptions,
                            "temperature": temps,
                            "feels_like": temps_feel,
                            "wind_speed": wind_speeds,
                            "rain_prob": rain_probability
                            })
    # Returning the dataframe
    return weathers_df

## Pushing the `weathers_df` to `weathers` table in SQL

In [11]:
get_weathers(list_of_cities).to_sql("weathers", if_exists="append", con=con, index=False)

160

In [16]:
weathers_sql = pd.read_sql_table("weathers", con = con)
weathers_sql

Unnamed: 0,id,city_id,forecast_time,outlook,temperature,feels_like,wind_speed,rain_prob
0,1,1,2023-11-08 12:00:00,broken clouds,9.53,6.78,5.60,0.00
1,2,1,2023-11-08 15:00:00,overcast clouds,9.96,7.70,4.59,0.00
2,3,1,2023-11-08 18:00:00,overcast clouds,8.60,6.09,4.40,0.00
3,4,1,2023-11-08 21:00:00,scattered clouds,7.52,4.61,4.71,0.00
4,5,1,2023-11-09 00:00:00,broken clouds,7.70,4.87,4.65,0.00
...,...,...,...,...,...,...,...,...
475,476,2,2023-11-12 21:00:00,scattered clouds,4.88,4.88,1.11,0.00
476,477,2,2023-11-13 00:00:00,broken clouds,5.45,4.68,1.34,0.00
477,478,2,2023-11-13 03:00:00,overcast clouds,5.04,3.84,1.62,0.00
478,479,2,2023-11-13 06:00:00,overcast clouds,3.72,1.98,1.91,0.00


## Defining a function to create a `flights_df`

In [23]:
def get_flights(city_list):

    icaos_list = []
    icaos_flat_list = []

    airports_sql = pd.read_sql_table("airports", con=con)
    
    for city in city_list:
        icaos_list.append(airports_sql.loc[airports_sql["city_name"]==city]["airport_icao"].to_list())

    icaos_flat_list = [item for sublist in icaos_list for item in sublist]

    # Get Tomorrow
    tomorrow = datetime.now() + timedelta(1)
    tomorrow_url = tomorrow.strftime("%Y-%m-%d")

    list_for_df = []

    for icao in icaos_flat_list:

        for i in range(2):

            url =  [f"https://aerodatabox.p.rapidapi.com/flights/airports/icao/{icao}/{tomorrow_url}T00:00/{tomorrow_url}T12:00",
                    f"https://aerodatabox.p.rapidapi.com/flights/airports/icao/{icao}/{tomorrow_url}T12:01/{tomorrow_url}T23:59"
                    ]    

            querystring = {"withLeg":"true"}

            headers = {
                "X-RapidAPI-Key": os.getenv("RAPID_API_KEY"),
                "X-RapidAPI-Host": "aerodatabox.p.rapidapi.com"
            }

            response = requests.get(url[i], headers=headers, params=querystring)
            
            arrivals = pd.json_normalize(response.json()["arrivals"])

            arrivals_df = pd.DataFrame({"flight_num": arrivals.loc[:,"number"],
                            "departure_icao": arrivals.loc[:,"departure.airport.icao"],
                            "arrival_icao": icao,
                            "arrival_time": arrivals.loc[:,"arrival.scheduledTime.local"].str[:-6].astype(str) + ":00"
                            })
            arrivals_df["arrival_time"] = pd.to_datetime(arrivals_df["arrival_time"])
                        
            list_for_df.append(arrivals_df)
    
    flights_df = pd.concat(list_for_df, ignore_index=True)       

    return flights_df

## Pushing the `flights_df` to `flights` table in SQL

## EGKR	KRH	Redhill Aerodrome airport close to London is making trouble, throwing json errors

In [None]:
get_flights(list_of_cities).to_sql("flights", if_exists="append", con=con, index=False)

In [15]:
flights_sql = pd.read_sql_table("flights", con=con)
flights_sql

Unnamed: 0,flight_id,flight_num,departure_icao,arrival_icao,arrival_time
0,1,E4 546,LIRF,EDDB,2023-11-09 06:00:00
1,2,SR 8853,LIRF,EDDB,2023-11-09 06:05:00
2,3,SM 2962,HEMA,EDDB,2023-11-09 06:20:00
3,4,QR 79,OTHH,EDDB,2023-11-09 06:50:00
4,5,BT 211,EVRA,EDDB,2023-11-09 07:55:00
...,...,...,...,...,...
803,804,LH 36,EDDF,EDDH,2023-11-09 22:20:00
804,805,LO 401,EPWA,EDDH,2023-11-09 22:20:00
805,806,OS 167,LOWW,EDDH,2023-11-09 22:00:00
806,807,SN 2629,EBBR,EDDH,2023-11-09 22:10:00
