# Data Quality and Data Wrangling 
## Course Code: DLBDSDQDW01

## Task 2: Scrape the web

This notebook describes the implementation of Task 2 of the Data Quality and Data Wrangling course (DLBDSDQDW01), it contains the code use for experimentation and the creation of the visualization according to the requirements in the task description.

### Data sources
The data was collected from the following sources:

1. [OpenWeather](https://openweathermap.org/api/one-call-3#concept): for weather data such as temperature, humidity, pressure, etc.
2. [AlphaVantage](https://www.alphavantage.co/documentation/): for Stock market data
2. [USGS Earthquake](https://earthquake.usgs.gov/fdsnws/event/1/): earthquake data for the world

In [None]:
# used packages
import requests
import numpy as np
import pandas as pd
import geopandas as gpd
import geodatasets
import cartopy.crs as ccrs
import cartopy.feature as cfeature
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.colors import LinearSegmentedColormap
import re
import time
import os
import json
from dotenv import load_dotenv
from pathlib import Path
from datetime import date, datetime, timezone, timedelta
from zoneinfo import ZoneInfo
from shapely.geometry import Point

In [None]:
# Load the API keys
load_dotenv(Path().cwd().parent.joinpath(r"config/.env"))

# Define location for time series

In [None]:
cities = ["Berlin" , "Munich", "Hamburg", "Baden_Baden", "Paris", "Madrid", "Mexico_city", "Tokio"]

# Weather data

Since the API provides data only for one timestamp and not the summary for the whole day, the request will be run every 2 hours, to get 12 readings per day.

In [None]:
OPENWEATHER_API_KEY = os.getenv("OPENWEATHER_API_KEY")

In [None]:
def fetch(url, headers:dict=None, params:dict=None):
    response = requests.get(url, params=params)
    response.raise_for_status()
    return response.json()

In [None]:
# geocoding endpoint - to get coordinates of the cities
url_geocoding = "http://api.openweathermap.org/geo/1.0/direct?"
params_geocoding = {"q":None, "limit":1, "appid" : OPENWEATHER_API_KEY}

In [None]:
responses_cities = []
for city in cities:
    params_geocoding["q"] = city
    response = fetch(url_geocoding, params=params_geocoding)
    responses_cities.append(response)
    time.sleep(1)

In [None]:
responses_cities

In [None]:
city_coordinates = {"city" : [], "country" : [], "lon" : [], "lat" : []}
for response_list in responses_cities: # a response list is the response of the API, a list containing a dictionary
    for response in response_list: # response is the dictionary containing the information of the countries
        city_coordinates["city"].append(response["name"])
        city_coordinates["lon"].append(response["lon"]) # X
        city_coordinates["lat"].append(response["lat"]) # Y
        city_coordinates["country"].append(response["country"])
df_geolocations = pd.DataFrame(city_coordinates)
#df_geolocations["geometry"] = df_geolocations.apply(lambda row: Point(row["lon"], row["lat"]), axis=1)
#df_geolocations = gpd.GeoDataFrame(df_geolocations, geometry="geometry", crs="EPSG:4326")
df_geolocations.to_csv("geocoding_openweather.csv", index=False)

In [None]:
df_geolocations

In [None]:
df_geolocations = df_geolocations.replace(dict(zip(df_geolocations.city.tolist(), cities)))
df_geolocations

In [None]:
df_geolocations.explore()

In [None]:
# weather endpoint - to get weather of the locations
url_weather = "https://pro.openweathermap.org/data/2.5/weather"

In [None]:
# Time zone for Berlin
berlin = ZoneInfo("Europe/Berlin")

In [None]:
today = datetime.now(berlin).replace(microsecond=0)

responses_weather = []

for nrow, record in df_geolocations.iterrows():
    lon = record["lon"]
    lat = record["lat"]
    params = {"units":"metric",
              "lon" : lon,
              "lat" : lat,
              "date" : today.isoformat(),
              "appid" : OPENWEATHER_API_KEY}
    response = requests.get(url_weather, params)
    response.raise_for_status()
    responses_weather.append(response)
    time.sleep(1)

In [None]:
# How does the response look like?
responses_weather[0].json()

In [None]:
weather = {"name" : [], # city name - it might not match no automatic geocoding by the API
           "temperature" : [], # Temperature
           "temperature_max" : [], # Max temp at the moment
           "temperature_min" : [], # Min temp at the moment
           "feels_like" : [], # Human perception of the weather
           "humidity":[], #
           "wind_speed":[], # in m/
           "wind_direction" : [],
           "description":[],
           "timestamp":[]}

for response in responses_weather:
    weather_data = response.json()
    weather["name"].append(weather_data["name"])
    weather["temperature"].append(weather_data["main"]["temp"])
    weather["temperature_max"].append(weather_data["main"]["temp_max"])
    weather["temperature_min"].append(weather_data["main"]["temp_min"])
    weather["feels_like"].append(weather_data["main"]["feels_like"])
    weather["humidity"].append(weather_data["main"]["humidity"])
    weather["wind_speed"].append(weather_data["wind"]["speed"])
    weather["wind_direction"].append(weather_data["wind"]["deg"])
    weather["description"].append(weather_data["weather"][0]["description"])
    weather["timestamp"].append(pd.to_datetime(weather_data["dt"], unit="s"))
    
df_weather = pd.DataFrame(weather)
df_weather

In [None]:
# commbine into one dataframe for the weather data
df_weather = pd.concat([df_geolocations, df_weather], axis=1)
df_weather = df_weather.drop("name", axis=1)
df_weather

In [None]:
df_weather.columns

In [None]:
df_weather["split_on"] = df_weather["city"]

In [None]:
df_weather.info()

In [None]:
# if weather data exists load it, if not
if Path("weather_data.csv").exists():
    print("loading latest data")
    history_df = pd.read_csv("weather_data.csv")
    # this dataframe is the final weather data. Store in staging area to combine later with further data
    df_weather = pd.concat([history_df, df_weather], axis=0).sort_values(by=["city", "timestamp"], ascending=False)
    df_weather.to_csv("weather_data.csv", index=False)
else:
    print("weather_data.csv does not exist. Latest data will be stored")
    df_weather.to_csv("weather_data.csv", index=False)

# Stock market data

In [None]:
ALPHAVANTAGE_API_KEY = os.getenv("ALPHAVANTAGE_API_KEY")

In [None]:
#tickers = ("NVDA","AAPL")#,"META","RHM.DE","SPY","URTH","ACWI")
tickers = ("META",)

In [None]:
tickers

In [None]:
url_stocks = "https://www.alphavantage.co/query"

In [None]:
# to build the time series, two cases are identified.
# 1) The first time the VantageClient collects data from the API and 2) with data already exists for the specific symbols

# Does the dataStore already exit?
stocks = []
if Path("/Users/jorgetellez/Documents/06_Projects/IU_Data_Wrangling/data/processed/datastore.h5").exists():
    store = pd.HDFStore("/Users/jorgetellez/Documents/06_Projects/IU_Data_Wrangling/data/processed/datastore.h5", "r")
    print("DataStore content", store.keys())
    keys = [key.split(r"/")[-1] for key in store.keys() if key.split(r"/")[1] == "stocks" and key.split(r"/")[2] == "data"]
    store.close()
    print(keys)
    for symbol in tickers:
        params_stocks = {"function" : "TIME_SERIES_DAILY", # this endpoint provides a daily time series of the equity specified
                     "symbol" : symbol, # the equity -> replace this by all the companies that should be followed
                     "outputsize" : "compact",
                     "dataype" : "json",
                     "apikey" : ALPHAVANTAGE_API_KEY}
        response = requests.get(url_stocks, params=params_stocks)
        response.raise_for_status()
        stock = response.json()["Time Series (Daily)"]
        stocks.append({symbol:stock})

In [None]:
len(stocks)

In [None]:
with open(r"/Users/jorgetellez/Documents/06_Projects/IU_Data_Wrangling/data/raw/stocks/20250806_174026_stocks.json") as s:
    stocks = json.load(s)
    print(stocks)
    print(type(stocks))

In [None]:
stocks[0].keys()

In [None]:
dfs = []
for stock in stocks:
    for symbol, data in stock.items():
        df = pd.DataFrame(data).T
        df = df.set_index(pd.to_datetime(df.index))
        df = df.apply(pd.to_numeric, errors="coerce")
        df = df.rename(columns={"1. open" :"open", "2. high":"high", "3. low":"low", "4. close":"close", "5. volume": "volume"})
        df["symbol"] = symbol
        df["split_on"] = df["symbol"]
        if symbol in keys:
            df = pd.DataFrame(df.iloc[0, :]).T
            dfs.append(df)
            print(symbol, "exists")
        else:
            dfs.append(df)
df_stocks = pd.concat(dfs, axis=0)
#df_stocks.loc[:,["open", "high", "low", "close", "volume"]] = df_stocks.loc[:,["open", "high", "low", "close", "volume"]].apply(pd.to_numeric, errors="coerce")
df_stocks[["open", "high", "low", "close", "volume"]] = df_stocks[["open", "high", "low", "close", "volume"]].astype("float64")
df_stocks

In [None]:
df_stocks.dtypes

In [None]:
fig, ax = plt.subplots(figsize=(24,6), nrows=1, ncols=len(tickers)+1, sharex=True)
for idx, ticker in enumerate(tickers):
    df = df_stocks[df_stocks["symbol"] == ticker]
    ax[idx].set_title(ticker)
    ax[idx].plot(df["open"], marker='.', label="Open")
    ax[idx].plot(df["close"], marker='.', label="Close")
    #ax[idx].set_ylim([0,250])
    ax[idx].grid(True, axis="y", linestyle=":", linewidth=0.5)
    ax[idx].spines[["top", "right", "bottom", "left"]].set_visible(False)
    ax[idx].legend(loc="upper right")

# Earthquake data

Parameters according the API documentation:

In [None]:
# URL
url = r"https://earthquake.usgs.gov/fdsnws/event/1/"

In [None]:
# According to the API documentation all times use UTC
# Time in Germany should be specified when making a request
berlin = ZoneInfo("Europe/Berlin")
tokyo = ZoneInfo("Asia/Tokyo")
now = datetime.now(berlin).replace(microsecond=0)
yesterday = now - timedelta(days=5)

In [None]:
# Query parameters
params = {"method" : "query", # submit a data request
          "format" : "geojson", # reponse format
          #"minlatitude" : 24.0, # Get earthquakes in Japan
          #"maxlatitude" : 46.0,
          #"minlongitude" : 122.0,
          #"maxlongitude" : 146,
          "limit" : 100, # Limit results to this value
          "starttime": yesterday.isoformat(), # the API expect ISO time format, here it is set
          "endtime" : now.isoformat(),
          "orderby" : "time"} # sort the results from most recent to oldest

In [None]:
# get the data from the API
response = requests.get(url, params=params)
earthquakes = response.json()

In [None]:
# Use the same names of the reponse in the dict for easier iteration
records = {"time":[], # time when the event ocurred - in milliseconds since the epoch
           "mag":[], # magnitude of the event - combine with magType for interpretation
           "magType":[], # magnitude types are described in the API documentation - must be mapped to a name easier to understand
           "alert":[],
           "tsunami":[],
           "place":[],
           "coordinates":[]}

In [None]:
for earthquake in earthquakes["features"]:
    for feature in records:
        if feature in earthquake["properties"]:
            records[feature].append(earthquake["properties"][feature])
        else:
            # geometry is a key in the response
            records[feature].append(earthquake["geometry"][feature])

In [None]:
mag_type_description = {
    "Mw": "Moment Magnitude",
    "Ms": "Surface Wave Magnitude",
    "mb": "Body Wave Magnitude",
    "ml": "Local (Richter) Magnitude",
    "mb_lg": "Lg-Wave Magnitude",
    "md": "Duration Magnitude",
    "MH": "Hand-calculated Magnitude",
    "MI": "Intensity-derived Magnitude",
    "Me": "Energy Magnitude",
    "Mg": "Surface Wave from Ground Displacement",
    "MWb": "Moment Magnitude from Body Waves",
    "Mwr": "Regional Moment Magnitude",
    "MwC": "Centroid Moment Magnitude",
    "MwB": "Body-wave Derived Moment Magnitude",
    "mww": "Moment Magnitude from W-phase"
}


In [None]:
df = pd.DataFrame(records)
df["lon"] = df.coordinates.apply(lambda coord: coord[0])
df["lat"] = df.coordinates.apply(lambda coord: coord[1])
df["depth"] = df.coordinates.apply(lambda coord: coord[2])
df = df.rename(columns={"time":"timestamp", "mag":"magnitude", "magType":"scale"})
df = df.drop("coordinates", axis=1)

df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
df["split_on"] = df["timestamp"].dt.strftime("date_%Y_%m_%d")
df

In [None]:
geodf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")
geodf

In [None]:
geodf.explore()

# Data Storage

In [None]:
def store(client, df, split_on):
    # make sure there is a column on which it can be grouped on
    if split_on not in df.columns:
        raise ValueError(f"{split_on} not found in Dataframe columns: {df.columns}")
    
    # to avoid min_size problems when storing string columns
    for col in df.select_dtypes(include=["object", "string"]).columns:
        entry_len = df[col].astype(str).map(len).max()  
        min_itemsize = max(entry_len, 15) # if the column is empty, use this as the default length

    datastore = Path().cwd().joinpath(r"datastore_dev.h5")
    with pd.HDFStore(datastore, "a") as datastore:
        for group, data in df.groupby(split_on):
            datastore.append(
                f"/{client}/data/{group}",
                data,
                format="table",
                data_columns=True,
                min_itemsize=min_itemsize)

In [None]:
store("weather", df_weather, "split_on")

In [None]:
store("earthquake", df, "split_on")

In [None]:
store("stocks", df_stocks, "split_on")

In [None]:
# gett all the data
datastore = Path().cwd().parents[0].joinpath(r"data/processed/datastore.h5")
weather = []
stocks = []
earthquakes = []
with pd.HDFStore(datastore, "r") as ds:
    for key in ds.keys():
        key_structure = key.split("/")
        if key_structure[1] == "weather" and key_structure[2] == "data":
            weather.append((key_structure[-1], ds[key].set_index("timestamp")))
        elif key_structure[1] == "stocks" and key_structure[2] == "data":
            stocks.append((key_structure[-1], ds[key]))
        elif key_structure[1] == "earthquake" and key_structure[2] == "data":
            earthquakes.append((key_structure[-1], ds[key]))

In [None]:
# create figure
fig, axs = plt.subplots(figsize=(20,18), nrows=len(weather)//2, ncols=2, sharex=False, sharey=True) # use figsize=(20,12) for report
temp_color = '#d62728'    # red for temperature
humidity_color = '#2ca02c'  # green for humidity
grid_color = '#e0e0e0' # gray for grid

for ax, weather_data in zip(axs.flatten(), weather):
    # extract the city name and the data
    city, data = weather_data
    # use a second y-axis for the humidity
    ax2 = ax.twinx()
    # plot the data
    ax.plot(data["temperature"], color=temp_color, alpha=0.9, label="Temperature")
    ax2.plot(data["humidity"], color=humidity_color, alpha=0.9, label="humidity")
    # remove the spines for each axis
    ax.spines[["top", "right"]].set_visible(False)
    ax2.spines[["top", "left"]].set_visible(False)

    # limits for the axis
    ax.set_ylim([0,40])
    ax2.set_ylim([0,100])
    
    # y-axis of the same color, for better readability 
    ax.spines['left'].set_color(temp_color)
    ax.spines['left'].set_linewidth(1.5)
    ax.spines['bottom'].set_color('gray')
    ax.spines['bottom'].set_linewidth(0.8)
    
    ax2.spines['right'].set_color(humidity_color)
    ax2.spines['right'].set_linewidth(1.5)

    # format the coordinate systems
    ax.set_title(city.replace("_", " "), fontsize=15, fontweight="normal", pad=15) #  some cities have a "_" in the name, remove it
    ax.set_ylabel("Temperature [°C]", color=temp_color, fontsize=14, fontweight='normal')
    ax2.set_ylabel("Humidity [%]", color=humidity_color, fontsize=14, fontweight='normal')
    
    ax.tick_params(axis='y', labelcolor=temp_color, colors=temp_color)
    ax2.tick_params(axis='y', labelcolor=humidity_color, colors=humidity_color)
    
    # x-axis must be formatted
    # make sure the DataFrames have a datetime index!
    ax.xaxis.set_major_formatter(mdates.DateFormatter('%m.%d %H:%M'))
    ax.xaxis.set_major_locator(mdates.HourLocator(interval=12))
    ax.xaxis.set_minor_locator(mdates.HourLocator(interval=6))
    plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right', fontsize=12)
    plt.setp(ax.yaxis.get_majorticklabels(), fontsize=12)
    plt.setp(ax2.yaxis.get_majorticklabels(), fontsize=12)

    ax.grid(True, linestyle='-', linewidth=0.3, color=grid_color, which="major", alpha=1)
    ax.set_axisbelow(True)

    # remove the ticks
    ax.tick_params(axis='both', which="minor", length=0, pad=8, labelsize=10)
    ax2.tick_params(axis='both', which="minor", length=0, pad=8, labelsize=10)

# uncomment when creating image for word
fig.suptitle("Weather data by city", fontsize=16)
plt.tight_layout(rect=[0, 0, 1, 0.96])
plt.savefig(Path().cwd().parents[0].joinpath("utils/img/weather_plots.png"))

In [None]:
quakes = [data for date, data in earthquakes]
quakes = pd.concat(quakes, ignore_index=True, axis=0)
quakes["geometry"] = quakes.apply(lambda row: Point(row["lon"], row["lat"]), axis=1)
quakes = gpd.GeoDataFrame(quakes, geometry="geometry", crs="EPSG:4326")
quakes = quakes.to_crs("EPSG:3857") # mercator projection
quakes

In [None]:
# create map - use mercator projection instead of plate carre
fig, ax = plt.subplots(figsize=(20,20), subplot_kw={"projection":ccrs.Mercator()})
ax.add_feature(cfeature.COASTLINE, linewidth=0.8, edgecolor='#2d3436', alpha=0.7)
ax.add_feature(cfeature.LAND, facecolor='#ddd6c1', alpha=0.3)
ax.add_feature(cfeature.OCEAN, color='#a8dadc', alpha=0.3)
ax.add_feature(cfeature.BORDERS, linewidth=0.5, edgecolor='#636e72', alpha=0.5)
# add the gridlines
gridliner = ax.gridlines(draw_labels=True, linewidth=0.5, alpha=0.5, color='gray', linestyle=':')
gridliner.top_labels = False
gridliner.right_labels = False
gridliner.xlabel_style = {"fontsize":14}
gridliner.ylabel_style = {"fontsize":14}

# format of the earthquakes
colors = quakes["depth"] # color according to the depth, the deeper the darker

ax.scatter(quakes.geometry.x, quakes.geometry.y,
           c=colors,
           alpha=0.4)

# add the colorbar to represent the depth
sm = plt.cm.ScalarMappable(cmap="viridis_r", norm=plt.Normalize(vmin=quakes["depth"].min(), vmax=quakes["depth"].max()))
cbar = plt.colorbar(sm, ax=ax, shrink=0.4, aspect=30, pad=0.02)
cbar.set_label("Depth (km)", fontsize=14)

ax.set_title('Earthquake Distribution by Depth (Mercator Projection)', fontsize=16, pad=20)
fig.tight_layout()
plt.savefig(Path().cwd().parents[0].joinpath("utils/img/earthquakes_plot.png"))

In [None]:
symbols = {"NVDA":"Nvidia", "AAPL":"Apple", "META":"Meta", "RHM.DE":"Rheinmetall", "SPY":"SP&500", "URTH": "MSCI World", "ACWI":"MSCI ACWI"}

In [None]:
# create figure
fig, axs = plt.subplots(figsize=(21,14), nrows=len(stocks)//2, ncols=2, sharex=False, sharey=False)

# Use the same colors as for the temperature
downcolor = '#d62728'    # red for price decrease
upcolor = '#2ca02c'  # green for price increase
width_candle = 1
width_wick = 0.5

for ax, stocks_data in zip(axs.flatten(), stocks):
    # extract the city name and the data
    symbol, data = stocks_data
    data = data.sort_index()
    data = data[~data.index.duplicated()]
    up = data[data["close"] >= data["open"]]
    down = data[data["close"] < data["open"]]
    ax.set_title(f"{symbols[symbol]} ({symbol})", fontsize=15, pad=15)
    # increasing prices
    ax.bar(up.index, height=up["close"]-up["open"], width=width_candle, bottom=up["open"], color=upcolor) # body of the candle
    ax.bar(up.index, height=up["high"]-up["close"], width=width_wick, bottom=up["close"], color=upcolor) # upper wick of the candle
    ax.bar(up.index, height=up["low"]-up["open"], width=width_wick, bottom=up["open"], color=upcolor) # lower wick of the candle
    # decreasing prices
    ax.bar(down.index, height=down["close"]-down["open"], width=width_candle, bottom=down["open"], color=downcolor) # body of the candle
    ax.bar(down.index, height=down["high"]-down["open"], width=width_wick, bottom=down["open"], color=downcolor) # upper wick of the candle
    ax.bar(down.index, height=down["low"]-down["close"], width=width_wick, bottom=down["close"], color=downcolor) # upper wick of the candle    
    
    ax.spines[["top", "right"]].set_visible(False)
    if symbol == "RHM.DE":
        y_label = "Stock prince (EUR)"
    else:
        y_label = "Stock price (USD)"
    ax.set_ylabel(y_label, fontsize=14, fontweight='normal')

    ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y.%m.%d'))
    ax.xaxis.set_major_locator(mdates.DayLocator(interval=5))
    plt.setp(ax.xaxis.get_majorticklabels(), rotation=90, ha='center', fontsize=12)
    plt.setp(ax.yaxis.get_majorticklabels(), fontsize=12)

    ax.grid(True, linestyle='-', linewidth=0.3, color=grid_color, axis="y", which="major", alpha=1)
    
# uncomment when creating image for word
fig.suptitle("Stock prices (Last 100 days)", fontsize=16)
plt.tight_layout(rect=[0, 0, 1, 0.96])
plt.savefig(Path().cwd().parents[0].joinpath("utils/img/candlestick_plots.png"))