##### To-Do
- inspect tables
  - column names
- create dbt project in dbt folder
  - create new db for datawarehouse -> noaa_dw
- populate README

In [1]:
import duckdb
import requests
from pathlib import Path
import pandas as pd
from io import BytesIO
from zipfile import ZipFile, is_zipfile
import os, shutil

In [2]:
%load_ext sql
conn = duckdb.connect('../duckdb/noaa_db.duckdb') #create persistent db
%sql conn --alias duckdb

In [3]:
#Duckdb Config Changes
%config SqlMagic.autopandas = True
%config SqlMagic.feedback = False
%config SqlMagic.displaycon = False

## Functions

In [4]:
def fetch_folders(base_url: str) -> list:
    """Get list of zip folders containing NOAA data"""

    html = requests.get(base_url).content
    df_list = pd.read_html(html)
    df = df_list[-1]
    lszip = df[df.Name.str.endswith(".zip", na=False)]["Name"].tolist()

    return lszip

In [5]:
def unzip_folder(lszip: list) -> dict:
    """Unzip folder and return list of filenames and list of file content"""

    lsfilename = []
    lsfile = []
    
    for z in lszip:
        folder_url = f"{base_url[:-1]}/{z}"
        r = requests.get(folder_url)
        zipbytes = BytesIO(r.content)
        if is_zipfile(zipbytes):
            with ZipFile(zipbytes, "r") as myzip:
                for contentfilename in myzip.namelist():
                    contentfile = myzip.read(contentfilename)

                    lsfilename.append(contentfilename)
                    lsfile.append(contentfile)
        
    dictfile = dict(zip(lsfilename, lsfile))
    
    return dictfile

In [6]:
def write_csv(dictfile: dict) -> None:
    """Save csv files to tmp folder"""
    tmp_dir = "../tmp"
    Path(tmp_dir).mkdir(parents=True, exist_ok=True)

    for filename, file in dictfile.items():
        folder = filename.split("_")[0]
        trunc_fn = filename.split(".")[0]
        output_file = f"../tmp/{trunc_fn}.csv"
        outfile = open(output_file, "wb")
        outfile.write(file)
        outfile.close()

    return

In [7]:
def delete_csv(folder: str) -> None:
    """Remove tmp folder"""

    shutil.rmtree("../tmp")
    
    return

In [8]:
def ingest_noaa(base_url: str) -> None:
    """Get zip folders from noaa site and save to database"""

    lszip = fetch_folders(base_url)
    dictfile = unzip_folder(lszip)
    write_csv(dictfile)

    return 

In [9]:
def get_dataset_size(dataset: str) -> None:
    """Calculate the total size of the files in GB"""
    dataset_size = 0
    for file in os.listdir('../tmp/'):
        if file.startswith(f'{dataset}'):
            dataset_size += os.path.getsize(f'../tmp/{file}') 

    print(f'{dataset} files size: ' + str(round(dataset_size/(1024**3),2)) + ' GB')
    return

## Initial Loading

In [10]:
base_url = "https://www.st.nmfs.noaa.gov/st1/recreational/MRIP_Survey_Data/CSV/"
ingest_noaa(base_url)

In [11]:
# Get dataset sizes in GB
get_dataset_size('catch')
get_dataset_size('size')
get_dataset_size('trip')

catch files size: 1.08 GB
size files size: 1.07 GB
trip files size: 1.04 GB


In [12]:
%%sql
CREATE TABLE catch AS
SELECT * FROM read_csv('../tmp/catch_*.csv', 
    normalize_names = true,
    # ignore_errors = true,
    union_by_name = true, 
    filename =  true, 
    auto_detect = true,
    null_padding = true,
    all_varchar = true
    );


Unnamed: 0,Count
0,5654240


In [13]:
%%sql
CREATE TABLE size AS
SELECT * FROM read_csv('../tmp/size_*.csv', 
    normalize_names = true,
    # ignore_errors = true,
    union_by_name = true, 
    filename =  true, 
    auto_detect = true,
    null_padding = true,
    all_varchar = true
    );


Unnamed: 0,Count
0,7143915


In [14]:
%%sql
CREATE TABLE trip AS
SELECT * FROM read_csv('../tmp/trip_*.csv', 
    normalize_names = true,
    # ignore_errors = true,
    union_by_name = true, 
    filename =  true, 
    auto_detect = true,
    null_padding = true,
    all_varchar = true
    );


Unnamed: 0,Count
0,3638647


In [15]:
# Delete tmp folder and contents to free up space on local machine
delete_csv('../tmp/')

## Queries

In [4]:
 %reload_ext sql

In [8]:
%%sql 
SELECT * FROM INFORMATION_SCHEMA.tables;

Unnamed: 0,table_catalog,table_schema,table_name,table_type,self_referencing_column_name,reference_generation,user_defined_type_catalog,user_defined_type_schema,user_defined_type_name,is_insertable_into,is_typed,commit_action
0,noaa_db,main,catch,BASE TABLE,,,,,,YES,NO,
1,noaa_db,main,size,BASE TABLE,,,,,,YES,NO,
2,noaa_db,main,trip,BASE TABLE,,,,,,YES,NO,
3,noaa_db,datawarehouse,stg_noaa_mrip__catches,VIEW,,,,,,NO,NO,


In [44]:
%sql catch_cols << SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.columns WHERE table_name = 'catch';
%sql size_cols << SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.columns WHERE table_name = 'size';
%sql trip_cols << SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.columns WHERE table_name = 'trip';

In [50]:
%%sql
DESCRIBE noaa_db.size;

Unnamed: 0,column_name,column_type,null,key,default,extra


In [7]:
%%sql
DROP VIEW noaa_db.main.stg_noaa_mrip__catches

Unnamed: 0,Success
