<a href="https://colab.research.google.com/github/mminvestai/warren-says/blob/main/ingest2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import argparse
import logging
import re

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

import yfinance as yf
import pandas as pd
from pandas import read_csv

def consolidate(df_daily, df_interval, interval):
    if df_interval.empty == True:
        df_interval.rename(columns = {interval: 'date'}, inplace = True)
        df_consolidated = df_daily.merge(df_interval, on = ['date'], how='outer')
    else:
        if interval == 'date':
            df_consolidated = df_daily.merge(df_interval, on = ['date'], how='outer')
        else:
            if interval == 'week':
                df_interval['week'] = (df_interval['date'].dt.week % 52) + 1
                df_daily['week'] = df_daily['date'].dt.week
            elif interval == 'month':
                df_interval['month'] = (df_interval['date'].dt.month % 12) + 1
                df_daily['month'] = df_daily['date'].dt.month
            elif interval == 'quarter':
                df_interval['quarter'] = (df_interval['date'].dt.quarter % 4) + 1
                df_daily['quarter'] = df_daily['date'].dt.quarter
            df_interval['year'] = 0
            df_interval.loc[df_interval[interval] == 1, 'year'] = df_interval['date'].dt.year + 1
            df_interval.loc[df_interval[interval] != 1, 'year'] = df_interval['date'].dt.year
            df_daily['year'] = df_daily['date'].dt.year
            df_interval = df_interval.drop(columns=['date'])
            df_consolidated = df_daily.merge(df_interval, on = [interval, 'year'], how='outer')
            df_consolidated = df_consolidated.drop(columns=['year'])
            df_consolidated = df_consolidated.drop(columns=[interval])
    col = df_consolidated.pop('date')
    df_consolidated.insert(0, 'date', col)
    return df_consolidated

def run(argv=None, save_main_session=True):
  data_temp_list = []

  """Main entry point; defines and runs the wordcount pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input',
      dest='input',
      #default='gs://dataflow-samples/shakespeare/kinglear.txt',
      #required=False,
      required=True,
      help='Input file to process.')
  parser.add_argument(
      '--output',
      dest='output',
      required=True,
      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  # We use the save_main_session option because one or more DoFn's in this
  # workflow rely on global context (e.g., a module imported at module level).
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

  # The pipeline will be run on exiting the with block.
  with beam.Pipeline(options=pipeline_options) as p:

    # Read the text file[pattern] into a PCollection.
    ticker_list = pd.read_csv(known_args.input)['ticker']

    for ticker in ticker_list:
      df_list = []
      
      # Yahoo Finance Dividend and Splits
      running = True
      while running:
        try:
          temp = yf.Ticker(ticker).history(period="max")[['Dividends', 'Stock Splits']].reset_index().sort_values(by=['Date']).rename(columns={'Date': 'date'})
          running = False
        except:
          sleep(1)
      df_list.append(temp)

      # Yahoo Finance Stock Prices
      error = 1
      while error == 1:
        temp = yf.download(ticker)[['Open',	'High',	'Low', 'Close', 'Adj Close', 'Volume']].reset_index().sort_values(by=['Date']).rename(columns={'Date': 'date'})
        if len(temp) == 0:
          sleep(1)
        else:
            error = 0
      df_list.append(temp)
      
      # Nasdaq Quandl Options Implied Volatility
      #temp = quandl.get("VOL/"+opt_ticker)[['Hv10','Hv180','Phv10','Phv180','IvCall10','IvPut10','IvCall1080','IvPut1080']].reset_index().sort_values(by=['Date']).rename(columns={'Date': 'date'})
      #df_list.append(temp)

      ticker_data = df_list[0]
      #+2022-10-23
      ticker_data['date'] = ticker_data['date'].dt.strftime('%Y-%m-%d')
      temp['date'] = temp['date'].dt.strftime('%Y-%m-%d')
      #-2022-10-23

      if len(df_list) > 0:
        for index in range(1,len(df_list)):
            interval = df_list[index].columns[0]
            temp = df_list[index].rename(columns={interval: 'date'})
            ticker_data = consolidate(ticker_data, temp, interval)

      ticker_data.columns = ['{}{}'.format(c, '' if c == 'date' else '_'+ticker) for c in ticker_data.columns]
      data_temp_list.append(ticker_data)
      sleep(0) # limit api calls 300 requests 10 seg

    for index in range(0,len(data_temp_list)):
        interval = data_temp_list[index].columns[0]
        temp = data_temp_list[index].rename(columns={interval: 'date'})
        #temp['date'] = temp['date'].apply(lambda x: x.strftime('%Y-%m-%d')) -2022-10-23
        if len(data_temp_list[index]) > 0:
          temp['date'] = pd.to_datetime(temp['date'])
        if index == 0:
          data_temp = temp
        else:
          data_temp = consolidate(data_temp, temp, interval)

    data_temp['date_num'] = data_temp['date'].apply(dt.date.toordinal) - 693594 #get it in date_num in excel
    data_temp = data_temp[data_temp.date_num > 0]

    data_temp = data_temp.sort_values(by=['date']).fillna(method="ffill")
    data_temp.pop('date')
    output | 'Write' >> data_temp.to_csv(data_filename, index=False)

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/My Drive/Colab Notebooks
