In [4]:
import io
import polars as pl
import pandas as pd
import httpx
from typing import List, Mapping, Tuple, Union

# Set width of column for tables in polars
pl.Config.set_fmt_str_lengths(100)


polars.config.Config

# 1. Get All CES Series using Polars
- Polars is used instead; Pandas library execution would take signifinicant longer time especially in the read operation

In [162]:
# Set global variables
HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3",
    "Accept-Language": "en-US,en;q=0.9",
}
BLS_PATH_URL = "https://download.bls.gov/pub/time.series"

In [158]:
# Helper functions
def get_response_from_url(url: str, headers: Mapping[str, str]) -> str:
    response = httpx.get(url, headers=headers)
    response.raise_for_status()
    return response.text

def build_url(id: str) -> str:
    # The key to access metadata for ALL CES Series
    key = "ce.data.0.AllCESSeries"
    return f"{BLS_PATH_URL}/{id}/{key}"

def parse_csv(obj: str) -> pl.DataFrame:
    # Parsing CSV with pandas pd.read_csv takes longer time, using polars for faster read operation
    data = pl.read_csv(io.StringIO(obj), separator="\t", infer_schema_length=False)
    data.columns = [col.strip() for col in data.columns]
    return data

def map_ce_series_name(data: pl.DataFrame, series_id: str) -> pl.DataFrame:
    # Get the ce series name mapping
    obj = get_response_from_url(
        f"{BLS_PATH_URL}/{series_id}/ce.series",
        headers=HEADERS,
    )
    # Parse the series id to series name mapping
    ce_series = (
        parse_csv(obj)
        .with_columns(pl.col("series_id").str.strip())
        .select(["series_id", "series_title"])
    )
    series_id_to_name = {}
    for row in ce_series.iter_rows():
        series_id_to_name[row[0]] = row[1]

    data = (
        data.with_columns(
            pl.col("series_id").map_dict(series_id_to_name).alias("series_name")
        )
        .with_columns(
            pl.col("series_name")
            .str.replace_all(" ", "_")
            .str.replace_all(",", "")
            .str.replace_all("-", "_")
            .str.replace_all("not_seasonally_adjusted", "nsa")
            .str.replace_all("seasonally_adjusted", "sa")
            .str.to_lowercase()
        )
        .select(
            pl.col("series_name"),
            pl.all().exclude(["series_name", "series_id"]),
        )
    )
    return data

In [163]:
def extractor_all_series(series_id: str = "ce") -> pl.DataFrame:
    """
    Extracts time series data for the given series ID from US Labour of Statistics.

    Parameters:
        series_id (str): The series ID for which to extract time series data.

    Returns:
        pl.DataFrame: A Polars DataFrame containing the extracted time series data.
        
    This function retrieves time series data for the specified series ID using the
    `get_response_from_url()` function to fetch the data from a URL. The retrieved
    data is then parsed into a Polars DataFrame using the `parse_csv()` function.
    
    The function proceeds to perform the following data transformations:
    1. Removes rows with "M13" as the period.
    2. Parses the "year" and "period" columns and creates a new "time" column with datetime format.
    3. Selects the "series_id", "time", and "value" columns.
    4. Shrinks the data types of columns to optimize memory usage.
    
    If the `series_id` is "ce", the function further processes the data using the
    `map_ce_series_name()` function to map the series names based on the specified series ID.
    
    The resulting Polars DataFrame contains the extracted time series data ready for analysis.
    """
    obj = get_response_from_url(
        build_url(series_id),
        headers=HEADERS,
    )
    raw_data = parse_csv(obj)
    
    data = (
        raw_data
        # Remove M13 as period
        .filter(pl.col("period") != "M13")
        # Parse datetime col
        .with_columns(
            [
                (pl.col("year") + pl.col("period"))
                .alias("time")
                .str.to_datetime("%YM%m", strict=False),
            ]
        )
        .select(
            [
                pl.col("series_id").str.strip(),
                pl.col("time"),
                pl.col("value").str.strip().cast(pl.Float64),
            ]
        )
        .select(pl.all().shrink_dtype())
    )

    if series_id == "ce":
        data = map_ce_series_name(data=data, series_id=series_id)

    return data

In [164]:
bls_data = extractor_all_series("ce")

In [165]:
print(f"Total rows for all CES Series: {len(bls_data)}")
print(f"Snapshot of the data extracted: {bls_data.head()}")

Total rows for all CES Series: 7895859
Snapshot of the data extracted: shape: (5, 3)
┌──────────────────────────────────────────┬─────────────────────┬─────────┐
│ series_name                              ┆ time                ┆ value   │
│ ---                                      ┆ ---                 ┆ ---     │
│ str                                      ┆ datetime[μs]        ┆ f32     │
╞══════════════════════════════════════════╪═════════════════════╪═════════╡
│ all_employees_thousands_total_nonfarm_sa ┆ 1939-01-01 00:00:00 ┆ 29923.0 │
│ all_employees_thousands_total_nonfarm_sa ┆ 1939-02-01 00:00:00 ┆ 30100.0 │
│ all_employees_thousands_total_nonfarm_sa ┆ 1939-03-01 00:00:00 ┆ 30280.0 │
│ all_employees_thousands_total_nonfarm_sa ┆ 1939-04-01 00:00:00 ┆ 30094.0 │
│ all_employees_thousands_total_nonfarm_sa ┆ 1939-05-01 00:00:00 ┆ 30299.0 │
└──────────────────────────────────────────┴─────────────────────┴─────────┘


# 2. Extract one of the popular time series using CES Public Open API using Pandas
- https://data.bls.gov/cgi-bin/surveymost?bls

In [5]:
import requests
import json
import pandas as pd
import re

In [6]:
# Helper functions
def process_series_data(series):
    data_rows = []
    seriesId = series["seriesID"]
    for item in series["data"]:
        year = item["year"]
        period = item["period"]
        value = item["value"]
        footnotes = ""
        for footnote in item["footnotes"]:
            if footnote:
                footnotes = footnotes + footnote["text"] + ","

        if "M01" <= period <= "M12":
            data_rows.append([seriesId, year, period, value, footnotes[0:-1]])

    return data_rows

def json_to_dataframe(json_data) -> pd.DataFrame:
    data_rows = []
    for series in json_data["Results"]["series"]:
        data_rows.extend(process_series_data(series))

    df = pd.DataFrame(
        data_rows, columns=["series id", "year", "period", "value", "footnotes"]
    )
    return df


def parse_datetime(df: pd.DataFrame) -> pd.DataFrame:
    df["time"] = pd.to_datetime(df["year"] + "-" + df["period"].str[-2:] + "-01")
    return df


def to_snakecase(text) -> str:
    return text.lower().replace(" ", "_")


def remove_special_characters(text)-> str:
    # Remove parentheses, hyphens, and square brackets
    cleaned_text = re.sub(r"[\(\)\-\[\]]", "", text)
    return cleaned_text

In [22]:
# Main extractor using v1 API 
def get_bls_series(series_id_to_name: Mapping[str, str]) -> pd.DataFrame:
    """
    Retrieves time series data for the specified series IDs from the BLS API.

    Parameters:
        series_id_to_name (Mapping[str, str]): A mapping of series IDs to their corresponding names.
                                              The keys are series IDs (str), and the values are
                                              the respective series names (str).

    Returns:
        pd.DataFrame: A pandas DataFrame containing the extracted time series data.

    This function uses the BLS API to fetch time series data for multiple series IDs based on the
    provided mapping of series IDs to names. The function performs the following steps to obtain
    and preprocess the data:

    1. Prepares the request data as a JSON string with the specified series IDs, start year, and end year.
    2. Sends a POST request to the BLS API to fetch the time series data in JSON format.
    3. Converts the JSON data to a pandas DataFrame using the json_to_dataframe function.
    4. Preprocesses the DataFrame by parsing the datetime, replacing series IDs with series names,
       converting column names to snake_case, renaming the "series_id" column to "series_name",
       converting series_name values to snakecase, and removing special characters.
    5. Reshapes the DataFrame to keep only the desired columns ("series_name", "time", "value"),
       and sets "series_name" as the index of the DataFrame.

    The provided mapping series_id_to_name allows you to retrieve data for multiple series IDs
    simultaneously. The resulting DataFrame contains the time series data ready for further analysis
    or visualization.
    """
    headers = {"Content-type": "application/json"}
    # Prepare the request data as a JSON string
    # NOTE: The start year and end year is set at 2013 and 2023 respectively
    data = json.dumps(
        {"seriesid": list(series_id_to_name.keys()), "startyear": "2013", "endyear": "2023"}
    )
    # Send a POST request to the BLS API to get the JSON data
    p = requests.post(
        "https://api.bls.gov/publicAPI/v1/timeseries/data/", data=data, headers=headers
    )
    # Load the json data
    json_data = json.loads(p.text)
    # Convert json_data to pd.DataFrame
    df = json_to_dataframe(json_data)
    
    # Preprocessing data into desired format
    data = (
        # Parse datetime to YY-MM-DD format
        parse_datetime(df)
        # Replace series id with series name
        .replace({"series id": series_id_to_name})
        # Make all column name to snake_case
        .rename(columns=lambda x: to_snakecase(x))
        # Replace name for column "series_id" to "series_name"
        .rename(columns={"series_id": "series_name"})
        # Turn all values in series_name to snakecase and remove special characters
        .assign(
            series_id=lambda x: x["series_name"]
            .apply(to_snakecase)
            .apply(remove_special_characters)
        )
        # Reshape df to desired features only
        .pipe(lambda df: df.loc[:, ["series_name", "time", "value"]])
        .set_index("series_name")
    )
    return data

In [12]:
# Set series id and series name 
SERIES_ID_TO_NAME = {"LNS14000000": "Unemployment Rate (Seasonally Adjusted)"}
# Extract time series using Public Data V1 API
bls_series = get_bls_series(series_id_to_name=SERIES_ID_TO_NAME)

In [13]:
print(f"Total rows for time series since 2013: {len(bls_series)}")
print(bls_series.dtypes)
print(f"Snapshot of the data extracted: {bls_single_series}")

Total rows for time series since 2013: 120
time     datetime64[ns]
value            object
dtype: object
Snapshot of the data extracted:                                               time value
series_name                                             
Unemployment Rate (Seasonally Adjusted) 2022-12-01   3.5
Unemployment Rate (Seasonally Adjusted) 2022-11-01   3.6
Unemployment Rate (Seasonally Adjusted) 2022-10-01   3.7
Unemployment Rate (Seasonally Adjusted) 2022-09-01   3.5
Unemployment Rate (Seasonally Adjusted) 2022-08-01   3.7
...                                            ...   ...
Unemployment Rate (Seasonally Adjusted) 2013-05-01   7.5
Unemployment Rate (Seasonally Adjusted) 2013-04-01   7.6
Unemployment Rate (Seasonally Adjusted) 2013-03-01   7.5
Unemployment Rate (Seasonally Adjusted) 2013-02-01   7.7
Unemployment Rate (Seasonally Adjusted) 2013-01-01   8.0

[120 rows x 2 columns]
