In [0]:
# dbutils.secrets.listScopes()
# dbutils.secrets.list("DatabricksScope")

## Part 1
##### I've never used S3 so used Azure instead
##### Script this process so the files in the S3 bucket (Azure container) are kept in sync with the source when data on the website is updated, added, or deleted.

In [0]:
############################   configure ADLS storage
fileSystem = 'bls'
storageAccountName = dbutils.secrets.get(scope = "DatabricksScope", key = "StorageAccountName")
storageAccountKey = dbutils.secrets.get(scope = "DatabricksScope", key = "StorageAccountKey")
fullStorageAccountName =  "{0}.dfs.core.windows.net".format(storageAccountName)
# file_name = "pr.data.0.Current.txt"


#config storage accounts 
spark.conf.set(f"fs.azure.account.key.{storageAccountName}.dfs.core.windows.net", storageAccountKey)
adlsbasepath = f"abfss://{fileSystem}@{storageAccountName}.dfs.core.windows.net"
file_paths = adlsbasepath + "/input/"

# path_to_checkpoint = adlsbasepath + "/checkpoint/" + file_name

In [0]:
dbutils.fs.ls(adlsbasepath)

[FileInfo(path='abfss://bls@[REDACTED].dfs.core.windows.net/checkpoints/', name='checkpoints/', size=0, modificationTime=1751837491000),
 FileInfo(path='abfss://bls@[REDACTED].dfs.core.windows.net/delta/', name='delta/', size=0, modificationTime=1751837493000),
 FileInfo(path='abfss://bls@[REDACTED].dfs.core.windows.net/input/', name='input/', size=0, modificationTime=0),
 FileInfo(path='abfss://bls@[REDACTED].dfs.core.windows.net/output/', name='output/', size=0, modificationTime=0),
 FileInfo(path='abfss://bls@[REDACTED].dfs.core.windows.net/schemas/', name='schemas/', size=0, modificationTime=1751837493000)]

#### Part 1.1 Retrieve files dynamically from bls.gov site. Use User-Agent header to avoid 403 errors

In [0]:
import requests
import re

# BLS FTP base URL
bls_base_url = "https://download.bls.gov/pub/time.series/pr/"

### original file names - see below for scraping website to get dynamically
# files = [
#     "pr.class", "pr.contacts", "pr.data.0.Current", "pr.data.1.AllData",
#     "pr.duration", "pr.footnote", "pr.measure", "pr.period",
#     "pr.seasonal", "pr.sector", "pr.series", "pr.txt"
# ]

# User-Agent header to avoid 403 errors
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
}

def get_file_list(bls_base_url,headers):
    """Extract file names from the plain-text BLS directory listing."""
    response = requests.get(bls_base_url, headers=headers)
    response.raise_for_status()

    ### use regex to find files beginning with pr
    return re.findall(r'pr\.[\w\.]+', response.text)

files = get_file_list(bls_base_url,headers)
# print(files)

adls_uri = adlsbasepath + "/input/"

def stream_to_adls(file_name):
    url = bls_base_url + file_name
    try:
        response = requests.get(url, headers=headers)
        # print(response)
        response.raise_for_status()
        lines = response.text.splitlines()

        # Convert to Spark DataFrame
        df = spark.createDataFrame([(line,) for line in lines], ["value"])

        # Write to ADLS
        df.write.mode("overwrite") \
        .option("header", "false") \
        .option("delimiter","\t") \
        .csv(f"{adls_uri}/{file_name}") 
        print(f"✅ Retrieved: {file_name}")  ### I got this part from copilot hee hee
    except Exception as e:
        print(f"❌ Failed to retrieve {file_name}: {e}")

# Run for all files in files
for file in files:
    stream_to_adls(file)

✅ Retrieved: pr.class
✅ Retrieved: pr.class
✅ Retrieved: pr.contacts
✅ Retrieved: pr.contacts
✅ Retrieved: pr.data.0.Current
✅ Retrieved: pr.data.0.Current
✅ Retrieved: pr.data.1.AllData
✅ Retrieved: pr.data.1.AllData
✅ Retrieved: pr.duration
✅ Retrieved: pr.duration
✅ Retrieved: pr.footnote
✅ Retrieved: pr.footnote
✅ Retrieved: pr.measure
✅ Retrieved: pr.measure
✅ Retrieved: pr.period
✅ Retrieved: pr.period
✅ Retrieved: pr.seasonal
✅ Retrieved: pr.seasonal
✅ Retrieved: pr.sector
✅ Retrieved: pr.sector
✅ Retrieved: pr.series
✅ Retrieved: pr.series
✅ Retrieved: pr.txt
✅ Retrieved: pr.txt


#### There's nothing I can find from bls that identifies a change in files so use autoloader for the syncing part; will use input folder in place of raw, delta folder for delta tables, schemas mandatory. This can then go on a schedule of some type to check bls files. Also some files have headers and some don't.....

In [0]:
from pyspark.sql.types import *

#### to determine schema for file
def get_bls_schema(file_name):
    """Return the appropriate schema for a given BLS file."""
    schemas = {
        "pr.data.1.AllData": StructType([
            StructField("series_id", StringType()),
            StructField("year", IntegerType()),
            StructField("period", StringType()),
            StructField("value", StringType()),
            StructField("footnote_codes", StringType())
        ]),
        "pr.data.0.Current": StructType([
            StructField("series_id", StringType()),
            StructField("year", IntegerType()),
            StructField("period", StringType()),
            StructField("value", DoubleType()),
            StructField("footnote_codes", StringType())
        ]),
        "pr.series": StructType([
            StructField("series_id", StringType()),
            StructField("area_code", StringType()),
            StructField("measure_code", StringType()),
            StructField("seasonal", StringType()),
            StructField("industry_code", StringType()),
            StructField("occupation_code", StringType()),
            StructField("class_code", StringType())
        ]),
        "pr.measure": StructType([
            StructField("measure_code", StringType()),
            StructField("measure_text", StringType())
        ]),
        "pr.class": StructType([
            StructField("class_code", StringType()),
            StructField("class_text", StringType())
        ]),
        "pr.duration": StructType([
            StructField("duration_code", StringType()),
            StructField("duration_text", StringType())
        ])
    }
    return schemas.get(file_name, None)

##### some files didn't correspond to any schemas. In real life....I would investigate further.
##### But the files we need for subsequent steps have schemas so I just kept going.....

In [0]:
from pyspark.sql.types import *

#### function that takes file_name and schema and writes to delta table, triggers once
def autoload_bls_file(file_name, schema):
    input_path = adlsbasepath + f"/input/{file_name}"
    checkpoint_path = adlsbasepath + "/checkpoints/bls/{file_name}"
    delta_table_name = f"bls_{file_name.replace('.', '_').lower()}"
    delta_path = adlsbasepath + f"/delta/{delta_table_name}"

    df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", adlsbasepath + "/schemas/bls/" + file_name)
    .option("delimiter", "\t")
    .option("header", "false")
    .schema(schema)  # Use .schema(), not .option("schema", ...)
    .load(input_path)
)
    query = df.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", checkpoint_path) \
        .trigger(availableNow=True) \
        .start(delta_path)

    query.awaitTermination()

    print(f"✅ Auto Loader triggered for {file_name} → {delta_table_name}")

### run for each file/schema

for file_name in files:
    schema = get_bls_schema(file_name)
    if not schema:
        print(f"No schema defined for file: {file_name}")  
    else:
        autoload_bls_file(file_name, schema)


✅ Auto Loader triggered for pr.class → bls_pr_class
✅ Auto Loader triggered for pr.class → bls_pr_class
No schema defined for file: pr.contacts
No schema defined for file: pr.contacts
✅ Auto Loader triggered for pr.data.0.Current → bls_pr_data_0_current
✅ Auto Loader triggered for pr.data.0.Current → bls_pr_data_0_current
✅ Auto Loader triggered for pr.data.1.AllData → bls_pr_data_1_alldata
✅ Auto Loader triggered for pr.data.1.AllData → bls_pr_data_1_alldata
✅ Auto Loader triggered for pr.duration → bls_pr_duration
✅ Auto Loader triggered for pr.duration → bls_pr_duration
No schema defined for file: pr.footnote
No schema defined for file: pr.footnote
✅ Auto Loader triggered for pr.measure → bls_pr_measure
✅ Auto Loader triggered for pr.measure → bls_pr_measure
No schema defined for file: pr.period
No schema defined for file: pr.period
No schema defined for file: pr.seasonal
No schema defined for file: pr.seasonal
No schema defined for file: pr.sector
No schema defined for file: pr.sec

In [0]:
# for stream in spark.streams.active:
#     stream.stop()

In [0]:
# dbutils.fs.rm(adlsbasepath + "/checkpoints", recurse=True)
# dbutils.fs.rm(adlsbasepath+ "/schemas/bls", recurse=True)

#  Part 2
##### read from API and save population data in S3 (Azure blob) as a json file
##### save in: output/population.json
##### did this first and then made an azure function out of it for the terraform script......
##### This was working GREAT....and then it started throwing the 404 error.....

In [0]:
import requests
import json
import pandas as pd

population_url = "https://datausa.io/api/data?drilldowns=Nation&measures=Population"

try:
    response = requests.get(population_url)
    response.raise_for_status()
    population_data = response.json()
    population_string = json.dumps(population_data) ### make a string out of the json object
    
    # Convert to list of dicts
    population_list = [json.loads(population_string)]

    # Create pandas DataFrame
    population_pdf = pd.DataFrame(population_list)

    #### create spark df; write to blob as json file
    population_df = spark.createDataFrame(population_pdf)
    population_df.write.mode("overwrite").format("json").save(adlsbasepath + "/output/population.json")
except Exception as e:
    print(f"Error: {e}")



Error: 404 Client Error: Not Found for url: https://datausa.io/api/data?drilldowns=Nation&measures=Population


# Part 3
##### Read pr.data.0.Current and json file into dataframes
##### from population data, generate the mean and the standard deviation of the annual US population across the years [2013, 2018] inclusive. Mean = sum of values divided by number of values. 
##### using stddev() function for standard deviation
##### nothing says: don't use tempviews, so I used tempviews after making dfs and wrote SQL

In [0]:
##### delimiter/schema not working after adding autoloader above to "automate" the reading of the files
##### parsed the data in the next cell.....
spark.read.text(adlsbasepath + "/input/pr.data.0.Current").show(truncate=False)


+--------------------------------------------+
|value                                       |
+--------------------------------------------+
|"PRS85006162      \t1996\tQ03\t         2.3"|
|"PRS85006162      \t1996\tQ04\t         1.1"|
|"PRS85006162      \t1996\tQ05\t         2.1"|
|"PRS85006162      \t1997\tQ01\t        -0.5"|
|"PRS85006162      \t1997\tQ02\t         5.7"|
|"PRS85006162      \t1997\tQ03\t         3.8"|
|"PRS85006162      \t1997\tQ04\t         1.3"|
|"PRS85006162      \t1997\tQ05\t         2.3"|
|"PRS85006162      \t1998\tQ01\t         4.1"|
|"PRS85006162      \t1998\tQ02\t         1.4"|
|"PRS85006162      \t1998\tQ03\t         4.3"|
|"PRS85006162      \t1998\tQ04\t         6.2"|
|"PRS85006162      \t1998\tQ05\t         3.3"|
|"PRS85006162      \t1999\tQ01\t         4.3"|
|"PRS85006162      \t1999\tQ02\t         1.1"|
|"PRS85006162      \t1999\tQ03\t         4.1"|
|"PRS85006162      \t1999\tQ04\t         6.0"|
|"PRS85006162      \t1999\tQ05\t         3.9"|
|"PRS85006162

In [0]:
##### read data into dataframes
from pyspark.sql import functions as F
from pyspark.sql.functions import col, regexp_replace

population_df = spark.read.format("json").load(adlsbasepath + "/output/population.json")
population_df.createOrReplaceTempView("population_tv")

#### after adding the autoloader above...input files needed parsing...this should be investigated

df_raw = spark.read.text(adlsbasepath + "/input/pr.data.0.Current")
df_split = df_raw.selectExpr(
    "split(value, '\\t')[0] as series_id",
    "int(split(value, '\\t')[1]) as year",
    "split(value, '\\t')[2] as period",
    "trim(split(value, '\\t')[3]) as value",
    "trim(split(value, '\\t')[4]) as footnote_codes"
)

# Remove quotes from all string columns
df_clean = df_split.select([
    regexp_replace(col(c), '"', '').alias(c) if df_split.schema[c].dataType.simpleString() == 'string' else col(c)
    for c in df_split.columns
])

#### remove spaces from column names
bls_clean_df = df_clean.select([F.col(c).alias(c.strip()) for c in df_clean.columns])
bls_clean_df.createOrReplaceTempView("bls_current_tv")

In [0]:
%sql
SELECT SUM(item.population) / 6 AS populationMean
-- , AVG(item.population) AS populationAvg
, stddev(item.population) AS populationStdDev
FROM population_tv
JOIN LATERAL explode(data) AS t(item)
WHERE item.`ID Year` BETWEEN 2013 AND 2018

populationMean,populationStdDev
317437383.0,4257089.54152933


#####  Part 3.2 For every series_id, find the best year: the year with the max/largest sum of "value" for all quarters in that year. (all quarters in that year is the same thing as all for that year. Unless you have funny calendar math and a quarter straddles two years....) Generate a report with each series id, the best year for that series, and the summed value for that year. Made a table report.

In [0]:
%sql
;with cte_maxvalue as (
SELECT series_id, year, SUM(value) AS valueSum
FROM bls_current_tv
GROUP BY  series_id, year
)
SELECT TRIM(b.series_id) AS series_id
, year
, b.maxValue AS value
FROM (
SELECT series_id, MAX(valueSum) AS maxValue
FROM (
SELECT series_id, year, SUM(value) AS valueSum
FROM bls_current_tv
-- WHERE trim(series_id) = 'PRS30006011'
GROUP BY  series_id, year
)a
GROUP BY a.series_id
)b
INNER JOIN
( 
SELECT *
FROM cte_maxvalue) cte
ON TRIM(cte.series_id)= TRIM(b.series_id)
AND cte.valueSum = b.maxValue
ORDER BY cte.series_id, cte.year

----series_id, year, period,value,footnote_codes

series_id,year,value
PRS30006011,2022,20.5
PRS30006012,2022,17.1
PRS30006013,1998,705.895
PRS30006021,2010,17.7
PRS30006022,2010,12.4
PRS30006023,2014,503.21600000000007
PRS30006031,2022,20.5
PRS30006032,2021,17.1
PRS30006033,1998,702.672
PRS30006061,2022,37.0


Databricks visualization. Run in Databricks to view.

In [0]:
# %sql
# SELECT *
# FROM bls_current_tv
# WHERE TRIM(series_id) = 'PRS30006032'
# AND TRIM(period) = 'Q01'
# AND TRIM(year) = '2018'

# returns different values than what's in git repo.....but works for data that's there now

[0;31m---------------------------------------------------------------------------[0m
[0;31mJSONDecodeError[0m                           Traceback (most recent call last)
File [0;32m/databricks/python/lib/python3.11/site-packages/requests/models.py:971[0m, in [0;36mResponse.json[0;34m(self, **kwargs)[0m
[1;32m    970[0m [38;5;28;01mtry[39;00m:
[0;32m--> 971[0m     [38;5;28;01mreturn[39;00m complexjson[38;5;241m.[39mloads([38;5;28mself[39m[38;5;241m.[39mtext, [38;5;241m*[39m[38;5;241m*[39mkwargs)
[1;32m    972[0m [38;5;28;01mexcept[39;00m JSONDecodeError [38;5;28;01mas[39;00m e:
[1;32m    973[0m     [38;5;66;03m# Catch JSON-related errors and raise as requests.JSONDecodeError[39;00m
[1;32m    974[0m     [38;5;66;03m# This aliases json.JSONDecodeError and simplejson.JSONDecodeError[39;00m

File [0;32m/usr/lib/python3.11/json/__init__.py:346[0m, in [0;36mloads[0;34m(s, cls, object_hook, parse_float, parse_int, parse_constant, object_pairs_hook, 

#####  Part 3.3 generate a report that will provide the value for series_id = PRS30006032 and period = Q01 and the population for that given year -- so year is a parameter? Added parameter year and to WHERE clause. Also one row doesn't make a very good report; made a table report and also a dashboard via the icon above
##### See dashboard report that shows the table with values for series_id = PRS30006032 and period = Q01 and the population, where available

In [0]:
%sql
;with cte_population AS (
SELECT TRIM(item.`ID Year`) AS year, item.population
FROM population_tv
JOIN LATERAL explode(data) AS t(item)
)
SELECT series_id, cte.year, period, value, population
FROM (
SELECT TRIM(series_id) AS series_id, TRIM(year) AS year,period, value
FROM bls_current_tv
WHERE TRIM(series_id) = 'PRS30006032'
AND TRIM(period) = 'Q01'
) a 
INNER JOIN cte_population cte 
ON a.year = cte.year
AND cte.year = :year  --- takes year parameter
ORDER BY a.year


series_id,year,period,value,population
PRS30006032,2016,Q01,-1.4,318558162


Databricks visualization. Run in Databricks to view.