# Ingest Data into GCP 
## Local Development Notebook

In [10]:
from utils import dict_from_yaml
from ingest import IngestYahoo

### Check out Api

### Run the Process Locally

In [11]:
config = dict_from_yaml('config.yaml')
ingest_yahoo = IngestYahoo(config)

### Check Config & Method Help

In [12]:
ingest_yahoo.config

{'env': 'dev',
 'api': {'name': 'yfinance',
  'tables': {'stock_history': {'tickers': ['MSFT', 'GOOGL', 'AAPL'],
    'period': '6mo'}}},
 'gcp': {'key_file': 'data-science-on-gcp-323609-ec18149ed324.json',
  'upload': {'dataset_id': 'yfinance_raw',
   'bucketname': 'data-science-on-gcp-323609-yfinance-staging',
   'schema_path': 'schemas/stock_history.json',
   'partition_col': 'Date',
   'partition_type': 'MONTH',
   'window': 3,
   'lag': 1,
   'add_updated_at': True}}}

In [13]:
help(ingest_yahoo.run)

Help on method run in module ingest.base:

run(env: str, overrides: dict = None) -> None method of ingest.IngestYahoo instance
    Ingestion process runner method to download, parse and upload api data into bigquery
    @param env environment determines which authentication method is used for GCP
    @param overrides overrides for default config, dict with same structure as config



Extracting the data creates a dictionary of dataframes with the raw format that is given by the api.

In [14]:
df_dict_raw = ingest_yahoo.extract()
df_dict_raw['stock_history'].head(5)

Unnamed: 0_level_0,Open,High,Low,Close,Volume,Dividends,Stock Splits,ticker
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2022-11-07 00:00:00-05:00,220.813095,227.199057,220.106852,226.661911,33498000,0.0,0.0,MSFT
2022-11-08 00:00:00-05:00,227.487519,230.421877,224.642681,227.656616,28192500,0.0,0.0,MSFT
2022-11-09 00:00:00-05:00,226.164555,227.417885,223.140679,223.319717,27852900,0.0,0.0,MSFT
2022-11-10 00:00:00-05:00,234.181827,242.039953,233.754114,241.691803,46268000,0.0,0.0,MSFT
2022-11-11 00:00:00-05:00,241.701744,246.675236,240.647351,245.799896,34620200,0.0,0.0,MSFT


In the transformation step, we reset the index to be ready for upload to GCP.

In [15]:
df_dict_transformed = ingest_yahoo.transform(df_dict_raw)
df_dict_transformed['stock_history'].head(5)

Unnamed: 0,Date,Open,High,Low,Close,Volume,Dividends,Stock Splits,ticker
0,2022-11-07 00:00:00-05:00,220.813095,227.199057,220.106852,226.661911,33498000,0.0,0.0,MSFT
1,2022-11-08 00:00:00-05:00,227.487519,230.421877,224.642681,227.656616,28192500,0.0,0.0,MSFT
2,2022-11-09 00:00:00-05:00,226.164555,227.417885,223.140679,223.319717,27852900,0.0,0.0,MSFT
3,2022-11-10 00:00:00-05:00,234.181827,242.039953,233.754114,241.691803,46268000,0.0,0.0,MSFT
4,2022-11-11 00:00:00-05:00,241.701744,246.675236,240.647351,245.799896,34620200,0.0,0.0,MSFT


## Test Run Method

You must run the first and second setup steps first.

In [16]:
ingest_yahoo.run(env='dev')

Extracting data from endpoint: yfinance
range: (2023-05-01 00:00:00, 2023-05-31 00:00:00)
Uploading dataframe to gcslocation: gs://data-science-on-gcp-323609-yfinance-staging/20230507:stock_history$202305
Uploading gcsfile from gs://data-science-on-gcp-323609-yfinance-staging/20230507:stock_history$202305 to bigquery table: yfinance_raw:stock_history$202305
Uploading data from gs://data-science-on-gcp-323609-yfinance-staging/20230507:stock_history$202305 to table stock_history
range: (2023-04-01 00:00:00, 2023-04-30 00:00:00)
Uploading dataframe to gcslocation: gs://data-science-on-gcp-323609-yfinance-staging/20230507:stock_history$202304
Uploading gcsfile from gs://data-science-on-gcp-323609-yfinance-staging/20230507:stock_history$202304 to bigquery table: yfinance_raw:stock_history$202304
Uploading data from gs://data-science-on-gcp-323609-yfinance-staging/20230507:stock_history$202304 to table stock_history
range: (2023-03-01 00:00:00, 2023-03-31 00:00:00)
Uploading dataframe to gcs

### Autodetect dataframe csv schema

**This can be a useful step 1**

Often when creating a production process for loading data into a bigquery table, you will need to determine the table schema. This functionality allows you to upload 100 rows of your dataframe into a temporary bigquery table, write the schema to a json file and then clean up by dropping the temporary table. This autodetected schema can then be used as a starting point for your schema. This is especially helpful when your table has lots of columns of data.

In [3]:
ingest_yahoo.config['api']['tables']['stock_history']['period'] = 'max' 
ingest_yahoo.config['gcp']['upload']['autodetect_mode'] = True 
ingest_yahoo.config

{'env': 'dev',
 'api': {'name': 'yfinance',
  'tables': {'stock_history': {'tickers': ['MSFT', 'GOOGL', 'AAPL'],
    'period': 'max'}}},
 'gcp': {'key_file': 'data-science-on-gcp-323609-ec18149ed324.json',
  'upload': {'dataset_id': 'yfinance_raw',
   'bucketname': 'data-science-on-gcp-323609-yfinance-staging',
   'schema_path': 'schemas/stock_history.json',
   'partition_col': 'Date',
   'partition_type': 'MONTH',
   'window': 3,
   'lag': 1,
   'add_updated_at': True,
   'autodetect_mode': True}}}

In [4]:
ingest_yahoo.run(env='dev')

Extracting data from endpoint: yfinance
Filtering first 100 rows of dataframe to autodetect schema
Uploading dataframe to bucket: gs://data-science-on-gcp-323609-yfinance-staging
Uploading gcsfile from gs://data-science-on-gcp-323609-yfinance-staging/20230507:TEMP_AD_stock_history to bigquery table: yfinance_raw:TEMP_AD_stock_history
Uploading data from gs://data-science-on-gcp-323609-yfinance-staging/20230507:TEMP_AD_stock_history to table TEMP_AD_stock_history
Dropping table used for autodetect TEMP_AD_stock_history


### Override default config

**This can be a useful step 2**

Useful when you would like to do a manual full load (Usually, you will do this before scheduling uploads). The reason to have this as an argument is so you can send the payload to the cloud run service. `overrides` must have the same structure as your config.yaml.

In [8]:
config = dict_from_yaml('config.yaml')
ingest_yahoo = IngestYahoo(config)

# One of full load using max stock data, set window to None
overrides = {
    'api': {
        'tables': {
            'stock_history':
                {'period': 'max'}
            }
    },
    'gcp': {
        'upload': {
            'window': None
        }
    }
}

In [9]:
ingest_yahoo.run(env='dev', overrides=overrides)

Overriding default config
Extracting data from endpoint: yfinance
Uploading dataframe to bucket: gs://data-science-on-gcp-323609-yfinance-staging
Uploading gcsfile from gs://data-science-on-gcp-323609-yfinance-staging/20230507:stock_history to bigquery table: yfinance_raw:stock_history
Uploading data from gs://data-science-on-gcp-323609-yfinance-staging/20230507:stock_history to table stock_history
