In [None]:
! pip install ../.

In [None]:
import logging
import os
import pandas as pd
import glob
from src.ingestion.plugins.ingestor import run_preprocess_data_and_insert_db
 
logging.getLogger().setLevel(logging.INFO)


## 1. Load data and config

In [None]:
from pathlib import Path
import pandas as pd
import json

root_dir = Path("../").resolve()
config_folder = root_dir / "config"
downloads_folder = root_dir.parent / "nfl-airflow" / "downloads"
outputs_folder = root_dir.parent / "nfl-airflow" / "outputs"

In [None]:
with open(os.path.join(config_folder, 'base_config.json')) as file:
    base_config = json.load(file)

In [None]:
global_details = base_config['global_details']
bronze_layer = base_config['bronze_layer']
bronze_layer['datasets']


In [None]:
for dataset_config in bronze_layer['datasets']:
    
    query_target = dataset_config["query_target"]
    url_config = dataset_config['url_config']
    local_downloads = os.path.join(downloads_folder, query_target)
    local_outputs = os.path.join(outputs_folder, query_target)
    
    if not os.path.exists(local_outputs):
        os.makedirs(local_outputs)
        
    run_preprocess_data_and_insert_db(
        local_downloads,
        query_target,
        f"bronze_{query_target}",
        "nfl_postgres",
        dataset_config['preprocessing_config']['headers'],
        local_outputs
    )

## 3. Ingest into SQL DB

In [None]:
from sqlalchemy import create_engine

In [None]:
engine = create_engine("postgresql+psycopg2://postgres:postgres@localhost:5432/nfl_postgres")
conn = engine.connect()

In [None]:
for dataset_config in bronze_layer['datasets']:
    
    query_target = dataset_config["query_target"]
    url_config = dataset_config['url_config']
    local_outputs = os.path.join(outputs_folder, query_target)
    
    file_path = os.path.join(local_outputs, f"bronze_{query_target}_processed.csv")
    
    df = pd.read_csv(file_path)
    df.to_sql(
        query_target,
        conn,
        if_exists='replace',
        schema = f'bronze_{query_target}'
    )
    logging.info(f"Wrote bronze_{query_target}_processed into schema bronze_{query_target}")