In [0]:
import urllib.request
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime

dbutils.widgets.text('catalog', 'rearcquest', 'Catalog')
dbutils.widgets.text('schema', 'raw', 'Schema')
dbutils.widgets.text('volume', 'bls_data', 'Volume')
dbutils.widgets.text('sourceUrl', 'https://download.bls.gov/pub/time.series/pr/', 'Source URL')

catalog = dbutils.widgets.get('catalog') 
schema = dbutils.widgets.get('schema')
volume = dbutils.widgets.get('volume')
baseUrl = dbutils.widgets.get('sourceUrl')

targetVolume = f"/Volumes/{catalog}/{schema}/{volume}/"

In [0]:
%run ./Utils

In [0]:
def convert_tab_file_to_table(filename, targetTable):
    try:
        # Read the file
        df = spark.read.option("delimiter", "\t").option("header", "true").option("inferSchema", "true").csv(filename)
        
        # Clean column names (strip whitespace)
        df = df.toDF(*[c.strip() for c in df.columns])
        df = df.toDF(*[c.replace(' ', '_') for c in df.columns])
        df = df.toDF(*[c.replace('-', '_') for c in df.columns])
        
        # Write to Delta table
        df.write.format("delta").mode("overwrite").saveAsTable(targetTable)
        
        return targetTable
    except Exception as e:
        raise Exception(f"Failed to convert file to table: {filename}; {str(e)}")

def import_file(baseUrl, targetFolder, filename):
    try:
        fileUrl = baseUrl + filename
        volumePath = targetFolder + filename
        
        # Fetch file from source with User-Agent header to comply with BLS policies
        req = urllib.request.Request(fileUrl, headers={'User-Agent': 'your-email@example.com'})
        with urllib.request.urlopen(req) as response:
            fileContent = response.read()
            lastModified = response.headers.get("Last-Modified")

        # Parse source modified time to be compatible with volume modified time
        sourceDt = datetime.strptime(lastModified, "%a, %d %b %Y %H:%M:%S %Z")
        sourceTimestampMs = int(sourceDt.timestamp() * 1000)
        
        # Check if file already exists in Volume with modification time >= source time
        try:
            existingFiles = dbutils.fs.ls(targetFolder)
            # Filter files that match filename and have modification time >= source time
            upToDateFiles = [f for f in existingFiles 
                if f.name == filename and f.modificationTime >= sourceTimestampMs]
            
            if upToDateFiles:
                pass
        except Exception:
            pass  # Target folder doesn't exist or is empty
        
        # Write file to Volume using dbutils
        dbutils.fs.put(volumePath, fileContent.decode('latin-1'), overwrite=True)

        #Overwrite table
        tableName=f"{catalog}.{schema}.{filename.replace('.', '_')}"
        
        #hardcoding a passby for pr.txt for now; TODO: detect actual data vs readmes
        print(filename);
        if filename=='pr.txt':
            pass
        else:
            convert_tab_file_to_table(f"{volumePath}", targetTable=tableName)

        return filename
    except Exception as e:
        raise Exception(f"Failed to import file: {filename}; {str(e)}")

In [0]:
def create_index_html_file(targetFolder):
    try:
        # List files in the Volume folder
        existingFiles = dbutils.fs.ls(targetFolder)
        targetFiles = {f.name for f in existingFiles if f.name}
        
        # Generate HTML content
        htmlContent = "<html><body><ul>"
        for filename in targetFiles:
            htmlContent += f"<li><a href='{filename}'>{filename}</a></li>"
        htmlContent += "</ul></body></html>"
        
        # Write index.html to Volume
        indexPath = targetFolder + 'index.html'
        dbutils.fs.put(indexPath, htmlContent, overwrite=True)
        
    except Exception as e:
        raise Exception(f"Failed to create index.html file; {str(e)}")

In [0]:
try:
    # Create the schema and volume if they don't exist
    ensure_path(catalog, schema, volume, type='volume')

    # Open link and scrape source filelist
    req = urllib.request.Request(baseUrl, headers={'User-Agent': 'your-email@example.com'})
    response = urllib.request.urlopen(req)
    html = response.read().decode('utf-8')

    # Get source filelist
    pattern = r'<a[^>]*href="[^"]*">(pr[^<]+)</a>'
    sourceFiles = re.findall(pattern, html, flags=re.IGNORECASE)
    sourceFiles = {f for f in sourceFiles if '://' not in f and not f.endswith('/')}
    
    # Get target filelist from Volume
    try:
        existingFiles = dbutils.fs.ls(targetVolume)
        targetFiles = {f.name for f in existingFiles if f.name}
    except Exception:
        targetFiles = set()  # Target folder doesn't exist yet

    filesToDelete = {f for f in targetFiles - sourceFiles if f != 'index.html'}   

    # Import files
    importErrors = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(import_file, baseUrl, targetVolume, f) 
                    for f in sourceFiles]
        
        for future in as_completed(futures):
            try:
                filename = future.result()
            except Exception as e:
                print(str(e))
                importErrors.append(str(e))
    if importErrors:
        raise Exception(f"Failed to import files: {', '.join(importErrors)}")
    
    # Delete files. Not using ThreadPoolExecutor because dbutils.fs.rm is not likely to take long, regardless of filesize.
    deleteErrors = []
    for filename in filesToDelete:
        try:
            dbutils.fs.rm(targetVolume + filename)
            print(f"Deleted file: {filename}")
        except Exception as e:
            print(str(e))
            deleteErrors.append(str(e))
    if deleteErrors:
        raise Exception(f"Failed to delete files: {', '.join(deleteErrors)}")

    # Create index file
    create_index_html_file(targetVolume)

    print(f"Files are in {targetVolume}")
        
except Exception as e:
    raise Exception(f"Error: {str(e)}")
    # Todo - better error handling