### We demonstrate how the data collection pipeline works in a few different scenarios

#### Case 1: first time running the pipeline

In [1]:
from data_collection.stock import StockMetadataManager, StockDataCollector
from data_collection.config import constants as Config

import pandas as pd
import logging

# Set up logging to print to stdout, which Cloud Run will capture
logging.basicConfig(level=logging.INFO)

# Initialize your classes outside of the endpoint
metadata_manager = StockMetadataManager(config=Config)
data_collector = StockDataCollector(metadata_manager=metadata_manager)

def trigger_pipeline():
    try:
        data, metadata = data_collector.run_ingestion_pipeline_localy()
        if data is not None and metadata is not None:
            logging.info("Data ingestion completed successfully.")
            
            # Attempt to save the updates, catching any exceptions that occur
            try:
                data_collector.save_updates(data=data, metadata=metadata)
                logging.info("Data saved successfully.")
                return {"message": "Data ingestion and save completed successfully."}
            except Exception as save_exception:
                logging.exception("Error occurred during data save.")
                return {"message": f"An error occurred during data save: {save_exception}"}

        else:
            logging.error("Data ingestion completed, but no data was returned.")
            return {"message": "Data ingestion completed, but no data was returned."}
    except Exception as e:
        logging.exception("Error occurred during data ingestion.")
        return {"message": f"An error occurred during data ingestion: {e}"}

In [2]:
Config

{'remote': {'bucket': 'stock_data_lake',
  'ticker_file': 'stock_data_lake/tickers.csv',
  'data_folder': 'stock_data_lake/data',
  'metadata_file': 'stock_data_lake/data/metadata.csv',
  'data_file': 'stock_data_lake/data/stock_history.csv'}}

There is no metadata or data file in the 'bucket', so any ticker in the ticker file will have to be ingested.

In [3]:
tickers = pd.DataFrame({'Ticker': ['AAPL', 'MSFT']})
tickers.to_csv(Config['remote']['ticker_file'], index = False)

tickers

Unnamed: 0,Ticker
0,AAPL
1,MSFT


In [4]:
trigger_pipeline()

All tickers will be ingested. No metadata csv file found stock_data_lake/data/metadata.csv.


INFO:root:Data ingestion completed successfully.
INFO:root:No existing data or metadata found. Writing new data and metadata
INFO:root:Data saved successfully.


First time ingestion: No data csv file found stock_data_lake/data/stock_history.csv.
All tickers will be ingested. No metadata csv file found stock_data_lake/data/metadata.csv.


{'message': 'Data ingestion and save completed successfully.'}

In [5]:
data = pd.read_csv(Config['remote']['data_file'])
metadata = pd.read_csv(Config['remote']['metadata_file'])

In [6]:
data

Unnamed: 0,Date,Open,High,Low,Close,Volume,Ticker
0,1980-12-12,0.099319,0.099750,0.099319,0.099319,469033600,AAPL
1,1980-12-13,0.099319,0.099750,0.099319,0.099319,469033600,AAPL
2,1980-12-14,0.099319,0.099750,0.099319,0.099319,469033600,AAPL
3,1980-12-15,0.094569,0.094569,0.094137,0.094137,175884800,AAPL
4,1980-12-16,0.087659,0.087659,0.087228,0.087228,105728000,AAPL
...,...,...,...,...,...,...,...
29454,2023-11-20,371.220001,378.869995,371.000000,377.440002,52465100,MSFT
29455,2023-11-21,375.670013,376.220001,371.119995,373.070007,28423100,MSFT
29456,2023-11-22,378.000000,379.790009,374.970001,377.850006,23345300,MSFT
29457,2023-11-23,378.000000,379.790009,374.970001,377.850006,23345300,MSFT


In [7]:
metadata

Unnamed: 0,ticker,ingestion_date,first_day,last_day,timespan,final_df_length,input_null_values,processed_null_values
0,AAPL,2023-11-26,1980-12-12,2023-11-24,15688,15688,0,0
1,MSFT,2023-11-26,1986-03-13,2023-11-24,13771,13771,0,0


#### Case 1.1: First time running the pipeline with a wrong ticker name

In [8]:
tickers = pd.DataFrame({'Ticker': ['AAPL', 'MSFT', 'FB']})
tickers.to_csv(Config['remote']['ticker_file'], index = False)

In [9]:
trigger_pipeline()

All tickers will be ingested. No metadata csv file found stock_data_lake/data/metadata.csv.


ERROR:yfinance:FB: No timezone found, symbol may be delisted
ERROR:root:Error loading history of FB. Retrieved data is not valid
INFO:root:Data ingestion completed successfully.
INFO:root:No existing data or metadata found. Writing new data and metadata
INFO:root:Data saved successfully.


First time ingestion: No data csv file found stock_data_lake/data/stock_history.csv.
All tickers will be ingested. No metadata csv file found stock_data_lake/data/metadata.csv.


{'message': 'Data ingestion and save completed successfully.'}

In [10]:
data = pd.read_csv(Config['remote']['data_file'])
metadata = pd.read_csv(Config['remote']['metadata_file'])

In [11]:
data

Unnamed: 0,Date,Open,High,Low,Close,Volume,Ticker
0,1980-12-12,0.099319,0.099750,0.099319,0.099319,469033600,AAPL
1,1980-12-13,0.099319,0.099750,0.099319,0.099319,469033600,AAPL
2,1980-12-14,0.099319,0.099750,0.099319,0.099319,469033600,AAPL
3,1980-12-15,0.094569,0.094569,0.094137,0.094137,175884800,AAPL
4,1980-12-16,0.087659,0.087659,0.087228,0.087228,105728000,AAPL
...,...,...,...,...,...,...,...
29454,2023-11-20,371.220001,378.869995,371.000000,377.440002,52465100,MSFT
29455,2023-11-21,375.670013,376.220001,371.119995,373.070007,28423100,MSFT
29456,2023-11-22,378.000000,379.790009,374.970001,377.850006,23345300,MSFT
29457,2023-11-23,378.000000,379.790009,374.970001,377.850006,23345300,MSFT


In [12]:
metadata

Unnamed: 0,ticker,ingestion_date,first_day,last_day,timespan,final_df_length,input_null_values,processed_null_values
0,AAPL,2023-11-26,1980-12-12,2023-11-24,15688,15688,0,0
1,MSFT,2023-11-26,1986-03-13,2023-11-24,13771,13771,0,0


#### Case 2: running the pipeline when an update is not needed

In [13]:
trigger_pipeline()

ERROR:yfinance:FB: No timezone found, symbol may be delisted
ERROR:root:Error loading history of FB. Retrieved data is not valid


INFO:root:Data ingestion completed successfully.
INFO:root:Data saved successfully.


{'message': 'Data ingestion and save completed successfully.'}

In [14]:
data = pd.read_csv(Config['remote']['data_file'])
metadata = pd.read_csv(Config['remote']['metadata_file'])

In [15]:
data[data['Date'] == '2023-11-24']

Unnamed: 0,Date,Open,High,Low,Close,Volume,Ticker
15687,2023-11-24,190.869995,190.899994,189.25,189.970001,24048300,AAPL
29458,2023-11-24,377.329987,377.970001,375.140015,377.429993,10176600,MSFT
29459,2023-11-24,190.869995,190.899994,189.25,189.970001,24048344,AAPL
29460,2023-11-24,377.329987,377.970001,375.13501,377.429993,10176649,MSFT


In [16]:
metadata

Unnamed: 0,ticker,ingestion_date,first_day,last_day,timespan,final_df_length,input_null_values,processed_null_values
0,AAPL,2023-11-26,1980-12-12,2023-11-24,15688,15688,0,0
1,MSFT,2023-11-26,1986-03-13,2023-11-24,13771,13771,0,0
2,AAPL,2023-11-26,2023-11-24,2023-11-24,1,1,0,0
3,MSFT,2023-11-26,2023-11-24,2023-11-24,1,1,0,0


#### Case 2.1: adding new ticker of interest in future iterations of the pipeline

In [17]:
tickers = pd.DataFrame({'Ticker': ['AAPL', 'MSFT', 'NVDA']})
tickers.to_csv(Config['remote']['ticker_file'], index = False)

tickers

Unnamed: 0,Ticker
0,AAPL
1,MSFT
2,NVDA


In [18]:
trigger_pipeline()

INFO:root:Data ingestion completed successfully.
INFO:root:Data saved successfully.


{'message': 'Data ingestion and save completed successfully.'}

In [19]:
data = pd.read_csv(Config['remote']['data_file'])
metadata = pd.read_csv(Config['remote']['metadata_file'])

In [20]:
data[data['Date'] == '2023-11-24']

Unnamed: 0,Date,Open,High,Low,Close,Volume,Ticker
15687,2023-11-24,190.869995,190.899994,189.25,189.970001,24048300,AAPL
29458,2023-11-24,377.329987,377.970001,375.140015,377.429993,10176600,MSFT
29459,2023-11-24,190.869995,190.899994,189.25,189.970001,24048344,AAPL
29460,2023-11-24,377.329987,377.970001,375.13501,377.429993,10176649,MSFT
29461,2023-11-24,190.869995,190.899994,189.25,189.970001,24048344,AAPL
29462,2023-11-24,377.329987,377.970001,375.13501,377.429993,10176649,MSFT
38535,2023-11-24,484.700012,489.209991,477.450012,477.76001,29464500,NVDA


In [21]:
metadata

Unnamed: 0,ticker,ingestion_date,first_day,last_day,timespan,final_df_length,input_null_values,processed_null_values
0,AAPL,2023-11-26,1980-12-12,2023-11-24,15688,15688,0,0
1,MSFT,2023-11-26,1986-03-13,2023-11-24,13771,13771,0,0
2,AAPL,2023-11-26,2023-11-24,2023-11-24,1,1,0,0
3,MSFT,2023-11-26,2023-11-24,2023-11-24,1,1,0,0
4,NVDA,2023-11-26,1999-01-22,2023-11-24,9073,9073,0,0


#### Case 2.2: updating the data

In [22]:
#We will create fake data and metadata files to demonstrate the update methods
data = pd.read_csv(Config['remote']['data_file'])
data['Date'] = pd.to_datetime(data['Date'])
data = data[(data['Date'] > pd.to_datetime('2023-11-01')) & (data['Date'] < pd.to_datetime('2023-11-10'))]
metadata = data.groupby('Ticker').max().reset_index()

metadata.rename(columns={'Ticker': 'ticker', 'Date': 'ingestion_date'}, inplace=True)
metadata.drop(['Open', 'High', 'Low', 'Close', 'Volume'], axis = 1, inplace=True)
metadata['first_day'] = metadata['ingestion_date'] - pd.DateOffset(days = 10)
metadata['last_day'] = metadata['ingestion_date']

metadata['timespan'] = 10
metadata['final_df_length'] = 10
metadata['input_null_values'] = 0
metadata['processed_null_values'] = 0

metadata

Unnamed: 0,ticker,ingestion_date,first_day,last_day,timespan,final_df_length,input_null_values,processed_null_values
0,AAPL,2023-11-09,2023-10-30,2023-11-09,10,10,0,0
1,MSFT,2023-11-09,2023-10-30,2023-11-09,10,10,0,0
2,NVDA,2023-11-09,2023-10-30,2023-11-09,10,10,0,0


In [23]:
data

Unnamed: 0,Date,Open,High,Low,Close,Volume,Ticker
15665,2023-11-02,175.289074,177.546095,175.229156,177.33638,77334800,AAPL
15666,2023-11-03,174.010754,176.587362,173.121926,176.417572,79763700,AAPL
15667,2023-11-04,174.010754,176.587362,173.121926,176.417572,79763700,AAPL
15668,2023-11-05,174.010754,176.587362,173.121926,176.417572,79763700,AAPL
15669,2023-11-06,176.147945,179.19392,175.978171,178.994186,63841300,AAPL
15670,2023-11-07,178.944239,182.199959,178.734524,181.58078,70530000,AAPL
15671,2023-11-08,182.110085,183.208629,181.351076,182.649368,49340300,AAPL
15672,2023-11-09,182.719278,183.87774,181.570782,182.169998,53763500,AAPL
29436,2023-11-02,346.536642,348.123418,344.071644,347.614471,24348100,MSFT
29437,2023-11-03,348.921826,353.672194,346.626467,352.085388,23624000,MSFT


In [24]:
metadata.to_csv(Config['remote']['metadata_file'], index = False)
data.to_csv(Config['remote']['data_file'], index = False)

In [25]:
trigger_pipeline()

INFO:root:Data ingestion completed successfully.
INFO:root:Data saved successfully.


{'message': 'Data ingestion and save completed successfully.'}

In [26]:
data = pd.read_csv(Config['remote']['data_file'])
metadata = pd.read_csv(Config['remote']['metadata_file'])

In [27]:
data

Unnamed: 0,Date,Open,High,Low,Close,Volume,Ticker
0,2023-11-02,175.289074,177.546095,175.229156,177.336380,77334800,AAPL
1,2023-11-03,174.010754,176.587362,173.121926,176.417572,79763700,AAPL
2,2023-11-04,174.010754,176.587362,173.121926,176.417572,79763700,AAPL
3,2023-11-05,174.010754,176.587362,173.121926,176.417572,79763700,AAPL
4,2023-11-06,176.147945,179.193920,175.978171,178.994186,63841300,AAPL
...,...,...,...,...,...,...,...
64,2023-11-20,493.119995,505.480011,491.809998,504.089996,41412000,NVDA
65,2023-11-21,501.260010,505.170013,492.220001,499.440002,56574700,NVDA
66,2023-11-22,498.519989,503.350006,476.899994,487.160004,89942000,NVDA
67,2023-11-23,498.519989,503.350006,476.899994,487.160004,89942000,NVDA


In [28]:
metadata

Unnamed: 0,ticker,ingestion_date,first_day,last_day,timespan,final_df_length,input_null_values,processed_null_values
0,AAPL,2023-11-09,2023-10-30,2023-11-09,10,10,0,0
1,MSFT,2023-11-09,2023-10-30,2023-11-09,10,10,0,0
2,NVDA,2023-11-09,2023-10-30,2023-11-09,10,10,0,0
3,AAPL,2023-11-26,2023-11-10,2023-11-24,15,15,0,0
4,MSFT,2023-11-26,2023-11-10,2023-11-24,15,15,0,0
5,NVDA,2023-11-26,2023-11-10,2023-11-24,15,15,0,0
