In [16]:
import pandas as pd
import sqlite3
from io import StringIO
import requests
import psycopg2
from sqlalchemy import create_engine
import os
import subprocess
import webbrowser

In [17]:
# Queries
create_table_query = '''
-- Drop schema, roles, and tables if they exist
DO $$

BEGIN
	IF EXISTS (SELECT 1 FROM pg_namespace WHERE nspname = 'api') THEN
		DROP SCHEMA IF EXISTS api CASCADE;
	END IF;

	IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'web_anon') THEN
		DROP ROLE IF EXISTS web_anon;
	END IF;

	IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'authenticator') THEN
		DROP ROLE IF EXISTS authenticator;
	END IF;
END $$;

-- Create schema
CREATE SCHEMA api;

-- Create roles
CREATE ROLE web_anon nologin;
CREATE ROLE authenticator noinherit login PASSWORD '1234';

-- Grant usage on schema to web_anon
GRANT USAGE ON SCHEMA api TO web_anon;

-- Create the table for government employment data
CREATE TABLE api.government_employment (
	series_id VARCHAR(24),
	year VARCHAR(24),
	period VARCHAR(24),
	value VARCHAR(24),
	footnote_code VARCHAR(24)
);

-- Create the table for women in government data
CREATE TABLE api.women_in_government (
	date VARCHAR(24),
	valueinthousands VARCHAR(24)
);

-- Create tables for 'totalnonfarm' and 'TotalPrivate_Employment'
CREATE TABLE api.totalnonfarm (
	series_id VARCHAR(16),
	year VARCHAR(16),
	period VARCHAR(16),
	value VARCHAR(16),
	footnote_code VARCHAR(16)
);

CREATE TABLE api.TotalPrivate_Employment (
	series_id VARCHAR(16),
	year VARCHAR(16),
	period VARCHAR(16),
	value VARCHAR(16),
	footnote_code VARCHAR(16)
);

-- Create the table for 'all_nonfarm' data
CREATE TABLE api.all_nonfarm (
	date VARCHAR(16),
	valueinthousands VARCHAR(16)
);

-- Create the table for 'prod_private' data
CREATE TABLE api.prod_private (
	date VARCHAR(16),
	valueinthousands VARCHAR(16)
);

-- Create the table for 'ratio' data
CREATE TABLE api.ratio (
	date VARCHAR(16),
	ratio VARCHAR(16)
);
'''

process_government_employment = '''
    -- Update the 'period' column in the government employment table
    UPDATE api.government_employment
    SET period = 
    CASE 
        WHEN period = 'M01' THEN 'January'
        WHEN period = 'M02' THEN 'Febrary'
        WHEN period = 'M03' THEN 'March'
        WHEN period = 'M04' THEN 'April'
        WHEN period = 'M05' THEN 'May'
        WHEN period = 'M06' THEN 'June'
        WHEN period = 'M07' THEN 'July'
        WHEN period = 'M08' THEN 'August'
        WHEN period = 'M09' THEN 'September'
        WHEN period = 'M10' THEN 'October'
        WHEN period = 'M11' THEN 'November'
        WHEN period = 'M12' THEN 'December'
        ELSE period
    END;

    -- Insert data into the 'women_in_government' table
    INSERT INTO api.women_in_government (date, valueinthousands)
    SELECT CONCAT(period, ' ', year), value
    FROM api.government_employment
    WHERE series_id = 'CES9000000010';
    '''

process_totalnonfarm = '''
    -- Update the 'period' column in the 'totalnonfarm' table
    UPDATE api.totalnonfarm
    SET period = 
    CASE 
        WHEN period = 'M01' THEN 'January'
        WHEN period = 'M02' THEN 'Febrary'
        WHEN period = 'M03' THEN 'March'
        WHEN period = 'M04' THEN 'April'
        WHEN period = 'M05' THEN 'May'
        WHEN period = 'M06' THEN 'June'
        WHEN period = 'M07' THEN 'July'
        WHEN period = 'M08' THEN 'August'
        WHEN period = 'M09' THEN 'September'
        WHEN period = 'M10' THEN 'October'
        WHEN period = 'M11' THEN 'November'
        WHEN period = 'M12' THEN 'December'
        ELSE period
    END;
    
    -- Insert data into the 'all_nonfarm' table
    INSERT INTO api.all_nonfarm (date, valueinthousands)
    SELECT CONCAT(period, ' ', year), value
    FROM api.totalnonfarm
    WHERE series_id = 'CES0000000001';
'''

process_totalprivate_employment = '''
    -- Update the 'period' column in the 'totalprivate_employment' table
    UPDATE api.totalprivate_employment
    SET period = 
    CASE 
        WHEN period = 'M01' THEN 'January'
        WHEN period = 'M02' THEN 'Febrary'
        WHEN period = 'M03' THEN 'March'
        WHEN period = 'M04' THEN 'April'
        WHEN period = 'M05' THEN 'May'
        WHEN period = 'M06' THEN 'June'
        WHEN period = 'M07' THEN 'July'
        WHEN period = 'M08' THEN 'August'
        WHEN period = 'M09' THEN 'September'
        WHEN period = 'M10' THEN 'October'
        WHEN period = 'M11' THEN 'November'
        WHEN period = 'M12' THEN 'December'
        ELSE period
    END;

    -- Insert data into the 'prod_private' table
    INSERT INTO api.prod_private (date, valueinthousands)
    SELECT CONCAT(period, ' ', year), value
    FROM api.totalprivate_employment
    WHERE series_id = 'CES0500000006';
'''

obtain_ratio = '''
    -- Insert data into the 'ratio' table
    INSERT INTO api.ratio (date, ratio)
    SELECT all_nonfarm.date, (
        (TRIM(prod_private.valueinthousands)::FLOAT) / 
        (TRIM(all_nonfarm.valueinthousands)::FLOAT - TRIM(prod_private.valueinthousands)::FLOAT)
    )::VARCHAR(16) AS ratio
    FROM api.all_nonfarm
    JOIN api.prod_private ON all_nonfarm.date = prod_private.date;
'''    

alter_datatype = '''
-- Alter the column type to NUMERIC
ALTER TABLE api.women_in_government
ALTER COLUMN valueinthousands TYPE NUMERIC USING (valueinthousands::NUMERIC);

-- Alter the column type to NUMERIC
ALTER TABLE api.women_in_government
ALTER COLUMN valueinthousands TYPE INTEGER USING (valueinthousands::INTEGER);

-- Alter the data type of 'ratio' in the 'ratio' table
ALTER TABLE api.ratio
ALTER COLUMN ratio TYPE NUMERIC(10,2) USING (ratio::NUMERIC);

-- Grant SELECT privileges on specific tables to web_anon
GRANT SELECT ON TABLE api.women_in_government TO web_anon;
GRANT SELECT ON TABLE api.ratio TO web_anon;

-- Grant web_anon role to authenticator
GRANT web_anon TO authenticator;
'''

In [18]:
# Function to stream and process data
def stream_and_process_data(url, engine, process_data, table_name, chunk_size=10000):
    
    # Use 'requests' to stream data from the URL
    # Define the custom headers
    headers = {
        'authority': 'www.google.com',
        'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
        'accept-language': 'en-US,en;q=0.9',
        'cache-control': 'max-age=0',
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
    }
    
    response = requests.get(url, stream=True, headers=headers)
    
    # Initialize an empty string to accumulate lines
    data_buffer = ""
    decoded_first_line=""
    for line in response.iter_lines():
        if line:
            if data_buffer=="" and decoded_first_line=="":
                decoded_first_line = line.decode('utf-8')
                data_buffer += decoded_first_line + "\n"
            elif data_buffer=="" and decoded_first_line!="":
                decoded_line = line.decode('utf-8')
                data_buffer += decoded_first_line + "\n"
                data_buffer += decoded_line + "\n"
            else:
                decoded_line = line.decode('utf-8')
                data_buffer += decoded_line + "\n"

            # Count the number of lines in the string
            line_count = data_buffer.count('\n') + 1
            
            # Check if the buffer size exceeds the chunk size
            if line_count >= chunk_size:
                # Convert the buffer to a Pandas DataFrame
                df_chunk = pd.read_csv(StringIO(data_buffer), delimiter='\t')
                
                # Step 1: Remove empty spaces from column names
                df_chunk.columns = df_chunk.columns.str.strip()
                df_chunk['series_id'] = df_chunk['series_id'].str.strip()

                # Step 2: Convert 'year' and 'value' columns to string
                df_chunk['year'] = df_chunk['year'].astype(str)
                df_chunk['value'] = df_chunk['value'].astype(str)

                # Step 3: Replace NaN values in 'footnote_codes' with an empty string
                df_chunk['footnote_codes'].fillna('', inplace=True)
                
                # Process the data
                
                df_chunk.to_sql(table_name, engine, if_exists='replace', index=False, schema='api')
                
                cursor.execute(process_data)
                conn.commit()

                # Reset the buffer
                line_count = 0
                data_buffer = ""
    
    if data_buffer != "":
        df_chunk = pd.read_csv(StringIO(data_buffer), delimiter='\t')
        
        # Step 1: Remove empty spaces from column names and series_id
        df_chunk.columns = df_chunk.columns.str.strip()
        df_chunk['series_id'] = df_chunk['series_id'].str.strip()
        
        # Step 2: Convert 'year' and 'value' columns to string
        df_chunk['year'] = df_chunk['year'].astype(str)
        df_chunk['value'] = df_chunk['value'].astype(str)

        # Step 3: Replace NaN values in 'footnote_codes' with an empty string
        df_chunk['footnote_codes'].fillna('', inplace=True)
        
        df_chunk.to_sql(table_name, engine, if_exists='replace', index=False, schema='api')
        
        cursor.execute(process_data)
        conn.commit()

In [19]:
# Main
# PostgreSQL connection details
pg_username = 'postgres'
pg_password = '1234'
pg_host = 'localhost'
pg_port = '5432'
pg_database = 'postgres'

# URL to the data
url_1 = 'https://download.bls.gov/pub/time.series/ce/ce.data.90a.Government.Employment'
url_2 = 'https://download.bls.gov/pub/time.series/ce/ce.data.00a.TotalNonfarm.Employment'
url_3 = 'https://download.bls.gov/pub/time.series/ce/ce.data.05a.TotalPrivate.Employment'

table_name_1 = 'government_employment'
table_name_2 = 'totalnonfarm'
table_name_3 = 'totalprivate_employment'


engine = create_engine(f'postgresql://{pg_username}:{pg_password}@{pg_host}:{pg_port}/{pg_database}')

# Create a PostgreSQL connection and a cursor
conn = psycopg2.connect(
    user=pg_username,
    password=pg_password,
    host=pg_host,
    port=pg_port,
    database=pg_database
)
cursor = conn.cursor()

cursor.execute(create_table_query)
conn.commit()

# Call the function to stream and process data
stream_and_process_data(url_1, engine, process_government_employment, table_name_1)
stream_and_process_data(url_2, engine, process_totalnonfarm, table_name_2)
stream_and_process_data(url_3, engine, process_totalprivate_employment, table_name_3)

cursor.execute(obtain_ratio)
conn.commit()

cursor.execute(alter_datatype)
conn.commit()

# Close the SQLite connection
conn.close()

In [20]:
notebook_directory = os.getcwd()
postgrest_path = notebook_directory + '\API'

# Specify the content of your .bat file
bat_content = fr'''
@echo off

rem go to directory where conf file is and start the PostgREST API
start cmd /k "cd {postgrest_path} && postgrest challenge.conf.txt"

rem Wait for the API to start (adjust timeout as needed)
timeout /t 5
'''

# Specify the path to save the .bat file
bat_file_path = fr'{notebook_directory}\startAPI.bat'

# Write the content to the .bat file
with open(bat_file_path, 'w') as bat_file:
    bat_file.write(bat_content)

result = subprocess.run(bat_file_path, shell=True)

In [21]:
# Open the first URL in the default web browser
webbrowser.open_new_tab('http://localhost:3000/women_in_government')
webbrowser.open_new_tab('http://localhost:3000/ratio')

True