In [None]:
from collections import namedtuple
import pandas as pd
import pyspark.sql.functions as f
import websocket
import datetime
import os
import json
import requests
import re
import time

In [None]:
os.environ["BASE_URL"] = "https://valsys.iai.cppib.io/"
def authenticate(username, password):
    # make the request
    auth_url =  os.environ["BASE_URL"] + "users/login"
    headers = {
      'username': username,
      'password': password
    }

    # decode into an object and validate
    response = requests.request("GET", auth_url, headers=headers, data=None)
    auth_response = json.loads(response.text.encode('utf8'), object_hook=lambda d: namedtuple('X', d.keys())(*d.values()))
    if auth_response.status != "success":
        print("ERROR:", auth_response.message)#
        return
    
    # set access token as environment variable
    os.environ["TOKEN"] = auth_response.data.AccessToken

class SocketHandler:
    def __init__(self, config) -> None:
        self.config = config
        self.error = None
        self.timeout = False
        self.resp = None
        self.state = "INPROGRESS"
        # enable trace in dev for debugging
        websocket.enableTrace(False)
        socketpath = "wss://valsys.iai.cppib.io/modeling/create/" + os.environ["TOKEN"]
        self.wsapp = websocket.WebSocketApp(socketpath, 
            on_open=self.create_model,
            on_message=self.msg_hanlder, 
            on_close=self.on_close,
        )
  
    def create_model(self, ws):
        ws.send(json.dumps(self.config))

    def msg_hanlder(self, ws, message):
        response = json.loads(message)
        print(response["status"], response["step"])
        err = response.get("error")
        close = response.get("Close")
        if err != "":
            self.error = response["error"]
            self.on_close(ws, 1000, message)
        elif close == True:
            self.resp = response
            self.on_close(ws, 1000, message)

    def on_close(self, ws, close_status_code, close_msg):
        self.state = "COMPLETE"
        if close_status_code != 1000:
          print(close_status_code, close_msg)
          print("i/o network timeout, will retry")
          self.timeout = True
        elif close_status_code or close_msg:
            print("close status code: " + str(close_status_code))
        ws.close()

    def run(self):
        # ping and pong period to keep socket connection
        pong = 30
        ping = (pong * 9) / 10
        self.wsapp.run_forever(ping_interval=pong, ping_timeout=ping, ping_payload="ping")
    
# Tag the machine models
def tag_models(modelID, tags):
    # authenticated header
    headers = {
        "content-type": "application/json",
        "Authorization": "Bearer "+os.environ["TOKEN"]
    }
    
    # make request
    url = os.environ["BASE_URL"] + "modeling/model-properties"
    body = {
        "modelID": modelID,
        "modelTags": tags,
        "update": True,
        "rollForward": True
    }
    response = requests.request("POST", url, headers=headers, data=json.dumps(body))
    
# Share models with the team
def share_model(modelID, userEmail, permissions):
    # authenticated header
    headers = {
        "content-type": "application/json",
        "Authorization": "Bearer "+os.environ["TOKEN"],
        "email": userEmail,
        "modelID": modelID
    }
    
    # make request
    url = os.environ["BASE_URL"] + "users/share-model"
    
    permissions = {
        "edit": True,
    }
    
    if permission == "view":
      permissions = {
        "view": True,
      }
      
    
    response = requests.request("POST", url, headers=headers, data=json.dumps(permissions))
    print("Shared model with:", userEmail)

In [None]:
# check for new source
model_id_lst = []
for tbl in list(spark.catalog.listTables("model_engine")):
  if "valsys_machine_models" == tbl.name:
    model_id_lst = [x.model_id for x in spark.read.table("model_engine.valsys_machine_models").select('model_id').drop_duplicates().collect()]

if len(model_id_lst) > 0:
  username = "overlord@cppib.com"
  password = "DDIOverlord!23"
  
  authenticate(username, password)
  # authenticated header
  headers = {
      "content-type": "application/json",
      "Authorization": "Bearer "+os.environ["TOKEN"]
  }

  # make request
  url = os.environ["BASE_URL"] + "users/models"
  body = {
      "models": model_id_lst,
  }
  response = requests.request("DELETE", url, headers=headers, data=json.dumps(body))
  print("models dropped")

In [None]:
spark.sql("DROP TABLE if exists model_engine.valsys_machine_models")

In [None]:
dbutils.fs.rm("s3a://cppib-iai-workspace/datalake/model_engine/valsys_machine_models.delta", recurse=True)

In [None]:
#cmd3 or cmd5
df = spark.read.table("model_engine.key_drivers").filter("type = 'machine'")
df = df.select(df.ticker, df.source, df.template_id).drop_duplicates()

# # check for new source

# for tbl in list(spark.catalog.listTables("model_engine")):
#   if "valsys_machine_models" == tbl.name:
#     df2 = spark.read.table("model_engine.valsys_machine_models").select('ticker', 'keydriver_source', 'templateID').drop_duplicates()
#     df = df.join(df2, [df2.keydriver_source == df.source], how="leftanti")

df1 = spark.read.table("model_engine.latest_period").filter('periodTypeId=1').drop("companyName")
bbg_ciq_map = spark.read.table("model_engine.bbg_ciq_map")
# ciqcompany = spark.read.table("xpressfeed.ciqCompany").select("companyId", "simpleIndustryId").distinct()
# ciqindustry = spark.read.table("xpressfeed.ciq_industry_mapping").select("simpleIndustryId", "IndustryGroup")

df_final = df.join(df1, [df.ticker == df1.ticker], how="left").select(df.ticker, df.source, df.template_id, df1.fiscalYear).distinct()
df_final = df_final.join(bbg_ciq_map, ["ticker"])

model_seeds = [namedtuple('X', df_final.columns)(*row) for row in df_final.collect()]

In [None]:
model_configurations = []
hist_period = 5
proj_period = 11
for row in model_seeds:
  configuration = {
    "companyName": row.companyName,
    "ticker": row.ticker,
    "templateID": row.template_id,
    "projectionPeriod": proj_period, # 11 kurt suggested, confirmed by valsys
    "historicalPeriod": hist_period, # same as above
    "industry": row.IndustryGroup,
    "startPeriod":  row.fiscalYear, 
    "startDate": datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
    "type": "DEFAULT",
    "periodType": "ANNUAL",
    "cashFlowType": "FCFF",
    "valuationType": "Perpetual Growth",
    "companyType": "Public",
    "targetVariable": "Implied share price",
    "variables": {"INTERNAL_SOURCE": row.source},
    "historicalMax": row.fiscalYear - hist_period + 1,
    "historicalMin": row.fiscalYear + proj_period
  }
  model_configurations.append(configuration)

In [None]:
print(len(model_configurations))

In [None]:
# Valsys Credentials

username = "sbessey@cppib.com"
password = "Absyks_1234"

team = ["jworthington@cppib.com", "jfuller@cppib.com"] #, "sbessey@cppib.com", "kurtkang@cppib.com", "ngill@cppib.com", "nellery@cppib.com", "rbosshard@cppib.com", "jteng@cppib.com", 
       # "amohammad@cppib.com", "sgoyal@cppib.com", "apatil@cppib.com", "kgehlaut@cppib.com", "msawant@cppib.com", "dchan@cppib.com", "emuceniece@cppib.com", "katieyang@cppib.com"]

tags = ["DDI Machine Model"]
permission = "edit"

# Create the models and saving model id
spawn_model_info = []

tbl_info = {
 'table_name': 'model_engine.valsys_machine_models',
 's3_path': 's3a://cppib-iai-workspace/datalake/model_engine/valsys_machine_models.delta',
 'format': 'delta'
}

if len(model_configurations) > 0:

  for i, config in enumerate(model_configurations):

    print("({} of {}) - Creating model for ticker: {}, source: {}".format(i+1, len(model_configurations), config["ticker"], config["variables"]["INTERNAL_SOURCE"]))

    if i % 10 == 0:
      authenticate(username, password)

    config_dict = config.copy()
    config_dict["keydriver_source"] = config["variables"]["INTERNAL_SOURCE"]
    config_dict.pop("variables")

    handler = SocketHandler(config)
    handler.run()
    
    while True:
      if handler.state == "COMPLETE":
        if handler.error != None:
          print("error building model:", handler.error)
          if handler.timeout == True and config.get("retry") != True:
              config["retry"] = True
              model_configurations.append(config)
        elif handler.resp != None:
          model_id = handler.resp["data"]["uid"]

          config_dict["model_id"] = model_id

          #tag models
          tag_models(model_id, tags)

          #share models
          for member in team:
            share_model(model_id, member, permission)

          spawn_model_info.append(config_dict)

          time.sleep(1)
        
        break
        
  model_df = spark.createDataFrame(pd.DataFrame(spawn_model_info))
  model_df.coalesce(1).write.saveAsTable(tbl_info["table_name"], mode="append", format=tbl_info["format"], path=tbl_info["s3_path"])

In [None]:
# username = "overlord@cppib.com"
# password = "DDIOverlord!23"

# team = ["katieyang@cppib.com"]
# tags = ["DDI Machine Model"]
# permission = "edit"

# authenticate(username, password)

In [None]:
# model_df = spark.read.table("model_engine.valsys_machine_models")
# model_ids = [x.model_id for x in model_df.select("model_id").distinct().collect()]

In [None]:
# for m in model_ids:
#   for t in team:
#     share_model(m, t, permission)