<a href="https://colab.research.google.com/github/mmistroni/jupyter/blob/master/ApacheBeamNotebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h3> Installing apache beam </h3>

In [None]:
!pip install apache-beam==2.19.0
#!pip install google-cloud-bigquery==0.25.0
#!pip install google-cloud-dataflow==2.4.0


<h3> Setting google env variables </h3>

In [None]:
import os
PROJECT = "datascience-projects" # REPLACE WITH YOUR PROJECT ID
BUCKET = "mm_dataflow_bucket" # REPLACE WITH YOUR BUCKET NAME
REGION = "us-central1" # REPLACE WITH YOUR BUCKET REGION e.g. us-central1

# Do not change these
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION


In [None]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

<h3> Starting. Importing packages </h3>

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

from itertools import groupby

<h3> Creating directory to hold data </h3>

In [None]:
!mkdir -p data

In [None]:
!ls

In [None]:
# Importing data into colab
from google.colab import files
uploaded = files.upload()

In [None]:
!mv dept-data.txt data

In [None]:
!ls data
!pwd

<h3> Google Authentication </h3>

In [None]:
from google.colab import auth
auth.authenticate_user()

In [None]:
from apache_beam.io.gcp.internal.clients import bigquery
table_schema = 'source:STRING, quote:STRING'
table_spec = bigquery.TableReference(
    projectId=PROJECT,
    datasetId='gcp_edgar',
    tableId='test_edgar_data')




In [None]:

p2 = beam.Pipeline()
test_buckt = 'gs://mm_dataflow_bucket/'
lines = (
     p2
     | beam.Create([
            {'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'},
            {'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."},
        ])
     #| 'Filter perennials' >> beam.Filter(
     #     lambda row: len(row.split(',')) > 3)
     | 'sending to putput' >> beam.Map(print)
     #| beam.io.WriteToText('{}{}'.format(test_buckt, 'cutCreate1'))
     
)
p2.run()
# visualize output
#!({'head -n 20 data/cutCreate1-00000-of-00001'})

# check tis link fo rwriting to gcs https://colab.research.google.com/notebooks/io.ipynb#scrollTo=0ENMqxq25szn


<h3> Edgar MasterIDX URL generation Pipeline </h3>

In [None]:
quarters = ['QTR1', 'QTR2', 'QTR3', 'QTR4']
full_dir = "https://www.sec.gov/Archives/edgar/full-index/{year}/{QUARTER}/"
def get_edgar_urls(years:list) :
    print('fetching master.idx for year {}'.format(years))
    idx_directories = [full_dir.format(year=year, QUARTER=qtr) for year in years for qtr in quarters]
    return ['{}'.format(edgar_dir) for edgar_dir in idx_directories]
output_bucket =  "gs://mm_dataflow_bucket/outputs"



In [None]:
import requests

import shutil
import requests
import sys
from pprint import pprint
quarters = ['QTR1', 'QTR2', 'QTR3', 'QTR4']
full_dir = "https://www.sec.gov/Archives/edgar/full-index/{year}/{QUARTER}/"


# Using Beautiful soup
import re, requests
from bs4 import BeautifulSoup

def processUrl(url):
  if 'master.idx' in url:
    return url

def crawl(base_page):
  req=requests.get(base_page)
  good_ones = []
  if req.status_code==200:
      html=BeautifulSoup(req.text,'html.parser')
      pages=html.find_all('a')
      for page in pages:
          url=page.get('href')
          res = processUrl(url)
          if res:
            full_url = '{}{}'.format(base_page, res)
            print('Appending..:{}'.format(full_url))
            good_ones.append(full_url)
      return good_ones

def generate_master_urls(all_url):
    res = map(lambda u: crawl(u), all_url)
    pprint(res)
    from itertools import chain
    unpacked = chain(*res)
    return list(unpacked)

def generate_edgar_urls_for_year(year):
    test_urls = ['https://www.sec.gov/Archives/edgar/full-index/{}/QTR1/',
             'https://www.sec.gov/Archives/edgar/full-index/{}/QTR2/',
             'https://www.sec.gov/Archives/edgar/full-index/{}/QTR3/',
             'https://www.sec.gov/Archives/edgar/full-index/{}/QTR4/']
    urls = map(lambda b_url: b_url.format(year), test_urls)
    return generate_master_urls(urls)




In [None]:
generate_edgar_urls_for_year('2019')

In [None]:
from past.builtins import unicode
class FileExtractingDoFn(beam.DoFn):
  """Parse each line of input text into words."""

  def read_file(self, url):
    with requests.get(url, stream=True) as r:
      r.raise_for_status()
      lines = []
      print('Writing to:{}'.format(local_filename))
      with open(local_filename, 'wb') as f:
          for chunk in r.iter_content(chunk_size=8192):
              if chunk: # filter out keep-alive new chunks
                  f.write(chunk)

  def crawl(self, base_page):
    req=requests.get(base_page)
    good_ones = []
    if req.status_code==200:
        html=BeautifulSoup(req.text,'html.parser')
        pages=html.find_all('a')
        for page in pages:
            url=page.get('href')
            res = processUrl(url)
            if res:
              full_url = '{}{}'.format(base_page, res)
              print('Appending..:{}'.format(full_url))
              good_ones.append(full_url)
        return good_ones
  def processUrl(self, url):
    if 'master.idx' in url:
      return url

  def __init__(self):
     # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    # super(WordExtractingDoFn, self).__init__()
    beam.DoFn.__init__(self)
  
  def process(self, element):
    """Returns an iterator over the words of this element.
    The element is a line of text.  If the line is blank, note that, too.
    Args:
      element: the element being processed
    Returns:
      The processed element.
    """
    print('Processing {}'.format(element))
    return self.crawl(element)

p3 = beam.Pipeline()
test_buckt = 'gs://mm_dataflow_bucket/'
lines = (
     p3
     | 'generate edgar url' >>beam.Create(generate_edgar_urls_for_year('2019'))
     
     | 'extract master idx files' >> (beam.ParDo(FileExtractingDoFn())
                                          .with_output_types(unicode)) 
     | 'sending to putput' >> beam.Map(print)
     #beam.io.WriteToText('{}{}'.format('data/', 'cutCreate1'))
)
p3.run()
# visualize output
#!({'head -n 20 data/cutCreate1-00000-of-00001'})



<h2> Test Pipeline that reads and parse Remote Filings remotely  </h2>

In [None]:
from apache_beam.io import WriteToText
from apache_beam.io.textio import ReadAllFromText
import urllib
from collections import defaultdict
from datetime import date, datetime
from itertools import groupby
p4 = beam.Pipeline()
test_bucket = 'gs://mm_dataflow_bucket/'
form_type = '13F-HR'
filename = '{}_{}'.format(form_type, datetime.now().strftime('%Y$m%d-%H%M'))


class ReadRemote(beam.DoFn):
  def process(self, element):
    print('REadRemote processing///{}'.format(element))
    data = urllib.request.urlopen(element) # it's a file like object and works just like a file
    return [line for line in data]

class ParseForm13F(beam.DoFn):

  def open_url_content(self, file_path):
    import requests
    print('Attepmting to open:{}'.format(file_path))
    return requests.get(file_path)

  def get_cusips(self, content):
    data = content.text
    data = data.replace('\n', '')
    subset = data[data.rfind('<XML>') + 5: data.rfind("</XML>")]
    from xml.etree import ElementTree
    tree = ElementTree.ElementTree(ElementTree.fromstring(subset))
    root = tree.getroot()
    all_dt =  [child.text for infoTable in root.getchildren() for child in infoTable.getchildren()
            if 'cusip' in child.tag]
    return all_dt

  def _group_data(self, lst):
    all_dict = defaultdict(list)
    if lst:
      print('Attempting to group..')
      data = sorted(lst, key=lambda x: x)
      for k, g in groupby(data, lambda x: x):
        grp = len(list(g))
        if grp > 1:
          print('{} has {}'.format(k, grp))
        all_dict[k].append(grp)
      
  def process(self, element):
    try:
      file_content = self.open_url_content(element)
      all_cusips = self.get_cusips(file_content)
      #self._group_data(all_cusips)
      #print('Found:{} in Processing {}'.format(len(all_cusips), element))
      return all_cusips
    except Exception as e:
      print('could not fetch data from {}:{}'.format(element, str(e)))
      return []

import requests
def format_string(input_str):
  return str(input_str.replace("b'", "").replace("'", "")).strip()

def cusip_to_ticker(cusip):
  try:
    #print('Attempting to get ticker for {}'.format(cusip))
    cusip_url = "https://us-central1-datascience-projects.cloudfunctions.net/cusip2ticker/{fullCusip}".format(fullCusip=cusip)
    #print('Opening:{}'.format(cusip_url))
    req=requests.get(cusip_url).json()
    ticker =  req['ticker']
    return format_string(ticker)
  except Exception as e:
    print('Unable to retrieve ticker for {}'.format(cusip))
    return ''

## BIG QUERY SCHEMA
from apache_beam.io.gcp.internal.clients import bigquery
edgar_table_schema = 'COB:STRING, CUSIP:STRING, COUNT:INTEGER, TICKER:STRING'
edgar_table_spec = bigquery.TableReference(
    projectId=PROJECT,
    datasetId='gcp_edgar',
    tableId='form_13hf_data')




In [None]:
p4 = beam.Pipeline()
lines = (
     p4
     #| 'generate master url' >>beam.Create(['https://www.sec.gov/Archives/edgar/full-index/2019/QTR1/master.idx'])
     | 'Sampling Data' >> beam.Create(['https://www.sec.gov/Archives/edgar/full-index/2019/QTR1/master.idx',
                    #'https://www.sec.gov/Archives/edgar/full-index/2019/QTR2/master.idx'
                    ])
     | 'readFromText' >> beam.ParDo(ReadRemote())
     | 'map to Str'   >> beam.Map(lambda line:str(line))
     | 'Filter only form 13HF' >> beam.Filter(lambda row: len(row.split('|')) > 4 and form_type in row.split('|')[2])
     | 'Generating Proper file path' >> beam.Map(lambda row: '{}/{}'.format('https://www.sec.gov/Archives', row.split('|')[4]))
     | 'replacing eol' >> beam.Map(lambda p: p[0:p.find('\\n')])
     #| 'sampling lines' >> beam.transforms.combiners.Sample.FixedSizeGlobally(10)
     #| 'flat Mapping' >> beam.Map(lambda elements: elements[0])
     | 'parsing edgar filing' >> beam.ParDo(ParseForm13F())
     | 'Combining similar' >> beam.combiners.Count.PerElement()
     | 'Groupring' >> beam.MapTuple(lambda word, count: (word, count))
     #| 'sampling again' >> beam.transforms.combiners.Sample.FixedSizeGlobally(20)
     #| 'Adding Cusip' >> beam.MapTuple(lambda word, count: (word, cusip_to_ticker(word), count))
     #| 'Filtering' >> beam.Filter(lambda tpl: tpl[1] > 300)
     #| 'Creating BigQuery Data' >> beam.MapTuple(lambda word, ticker, count: dict(COB=date.today().strftime('%Y-%m-%d'), CUSIP=word, TICKER=ticker,COUNT=count))
     #| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
     #                                       edgar_table_spec,
     #                                       schema=edgar_table_schema,
     #                                       write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
     #                                       create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
     | 'sending to out' >> beam.Map(print)
     #| beam.io.WriteToText('{}{}'.format(test_bucket, filename))
     #|  beam.io.WriteToText('cutCreate1-00000-of-00001')
)
p4.run()
# visualize output
#!({'head -n 20 cutCreate1-00000-of-00001'})


In [None]:
!({'head -n 20 data/cutCreate1-00000-of-00001-00000-of-00001'})

In [None]:
!ls

<h3>FEtching Ticker from Cusip </h3>

In [None]:
cusips = ['00401C108',
         '00404A109',
         '004225108',
         '004239109',
        '00434H108',
         'G1151C101',
        '00081T108']

for c in cusips:
  ticker = cusip_to_ticker(c)
  print('CUSIP:{}|TICKER:{}'.format(c, ticker))


<h3> Other data to join in </h3>


1.   Price Targets:GET /stock/{symbol}/price-target (500 messages per symbol)
2.   Estimates:https://iexcloud.io/docs/api/#estimates (10000 per symbols)
3.   Key Stats:GET /stock/{symbol}/stats/{stat?} day200MovingAvg, peRatio, beta,ytdChangePercent
4. 



<h3>Price Targets </h3>

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

def get_iexapi_keys():
  with open('gdrive/My Drive/passwords/iexapi.keys') as f:
    return f.readlines()[0]

In [None]:
import requests
def get_price_targets(symbol):
  price_targets_url = 'https://cloud.iexapis.com/stable/stock/{symbol}/price-target?token={token}'.format(symbol=symbol, token=get_iexapi_keys())
  return requests.get(price_targets_url).json()

def get_analysts_recommendation_trends(symbol):
  price_targets_url = 'https://cloud.iexapis.com/stable/stock/{symbol}/recommendation-trends?token={token}'.format(symbol=symbol, token=get_iexapi_keys())
  return requests.get(price_targets_url).json()

get_analysts_recommendation('VZ')

<h3> Estimates </h3>

In [None]:
def get_estimates(symbol):
  price_targets_url = 'https://cloud.iexapis.com/stable/stock/{symbol}/estimates?token={token}'.format(symbol=symbol, token=get_iexapi_keys())
  return requests.get(price_targets_url).json()

get_estimates('JNJ')

In [None]:
from apache_beam.transforms.combiners import Sample

In [None]:
# Test read remote file
def open_url_content(file_path):
  import requests
  return requests.get(file_path)

def get_cusips(file_content):
  data = content.text
  data = data.replace('\n', '')
  subset = data[data.rfind('<XML>') + 5: data.rfind("</XML>")]
  from xml.etree import ElementTree
  tree = ElementTree.ElementTree(ElementTree.fromstring(subset))
  root = tree.getroot()
  all_dt =  [child.text for infoTable in root.getchildren() for child in infoTable.getchildren()
          if 'cusip' in child.tag]
  return all_dt

In [None]:
# Sorting a pipeline in python
p2 = beam.Pipeline()
lines = (
     p2
     | beam.Create([('Test', 1), ('Another', 5), ('Third', 4)])
     | 'Sorting values perennials' >> beam.Filter(
          lambda row: len(row.split(',')) > 3)
     | 'sending to putput' >> beam.Map(print)
     #| beam.io.WriteToText('{}{}'.format(test_buckt, 'cutCreate1'))
)
p2.run()

<h3> Testing a PIpelien for retrieving shares </h3>

In [None]:
!pip install pandas-datareader

In [None]:
import json
import pandas as pd
from pandas.tseries.offsets import BDay
import pandas_datareader.data as dr
import numpy as np
from datetime import datetime, date
import requests


In [None]:
def get_all_shares_dataframe():
  all_shares = requests.get('https://k1k1xtrm88.execute-api.us-west-2.amazonaws.com/test/query-shares').json()
  ds = [d for d in all_shares if d['QTY'] > 1]
  return pd.DataFrame.from_dict(ds)

In [None]:
def get_latest_price_yahoo(symbol, cob_date):
  try:#
    print('--latest price for{}'.format(symbol))
    start_date = cob_date - BDay(1)
    dfy = dr.get_data_yahoo(symbol, start_date, start_date)[['Adj Close']]
    dft = dr.get_data_yahoo(symbol, cob_date, cob_date)[['Adj Close']]
    dfy['symbol'] = symbol
    dft['symbol'] = symbol
    print(dfy.shape)
    print(dft.shape)

    merged = pd.merge(dft, dfy,on='symbol', suffixes=('_t', '_y'),)
    merged['diff'] = merged['Adj Close_t'] - merged['Adj Close_y']
    print('Merged shap eis:{}'.format(merged.shape))
    return merged.iloc[0].to_dict()
                                                           
    
  except Exception as e :
    print('Unable to find data for {}'.format(symbol))
    return pd.DataFrame.from_dict({'symbol': [symbol], 'Adj Close_t': [0], 'Adj Close_y':[0], 'diff':[0]}).to_dict()

def get_prices(symbols):
  prices_dfs = (get_latest_price_yahoo(symbol, date.today()) for symbol in symbols)
  all_data = pd.concat(prices_dfs)
  return all_data



In [None]:
class FetchPortfolio(beam.DoFn):
  def process(self, element):
    # 1 call retur sharename, ticker, qty, original price
    print('REadRemote processing///{}'.format(element))
    data = urllib.request.urlopen(element) # it's a file like object and works just like a file
    return [line for line in data]

class FetchPrices(beam.DoFn):
  def process(self, element):
    # for each ticker return close today, close yesterday dif
    print('REadRemote processing///{}'.format(element))
    data = urllib.request.urlopen(element) # it's a file like object and works just like a file
    return [line for line in data]
  
class Accumulator(beam.DoFn):
  def process(self, element):
    # for each ticker generate row with all needed data
    pass


In [None]:
get_latest_price_yahoo('AAPL', date.today()-BDay(1))


In [None]:
all_shares_df = get_all_shares_dataframe()
symbols = all_shares_df['TICKER'].values[0:20]
prices_data = get_prices(symbols)
pd.merge(all_shares_df, prices_data, left_on='TICKER', right_on='symbol')




In [None]:
p2 = beam.Pipeline()
lines = (
     p2
     | beam.Create(['AAPL', 'MSFT', 'AMZN'])
     | 'Sorting values perennials' >> beam.Filter(
          lambda row: len(row.split(',')) > 3)
     | 'sending to putput' >> beam.Map(print)
     #| beam.io.WriteToText('{}{}'.format(test_buckt, 'cutCreate1'))
)
p2.run()

In [None]:
res.diff(axis=1, periods=-1)
