In [0]:
%python
## Import libraries
import pyspark as ps
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *
import pandas as pd
import json

In [0]:
# Retrieve the parameter values
job_id = dbutils.widgets.get("job_id")
usecase_id = dbutils.widgets.get("usecase_id")

# Print the parameter values
print(f"job_id: {job_id}")
print(f"usecase_id: {usecase_id}")

In [0]:
class Validate_ATL:
  def __init__(self):
    self.query = "Query About to fill"
    self.sql_df = pd.DataFrame()
    self.data_json = {"Data":[]}    
    self.usecase_id = None
  
  def logger(self, message):
    print(message)
    
  def set_query_string(self, usecase_parm):
        query = f"""with m(usecase_id, hist_retention) AS (SELECT mu.USECASE_ID,mu.hist_retention from com_us_alyt_ngebox.metadata_usecase mu where mu.USECASE_ID = '{usecase_parm}'),

s(usecase_id,SUGGEST_EXTERNAL_ID_VOD__C,NGEBox_Record_ID) AS (select p.usecase_id, p.SUGGEST_EXTERNAL_ID_VOD__C, p.NGEBox_Record_ID from (select * from com_us_alyt_ngebox.ngebox_suggestion_requests_processed where usecase_id = '{usecase_parm}') p 
left join com_us_lake.rep_suggestion_vod__c s 
on p.SUGGEST_EXTERNAL_ID_VOD__C = s.SUGGESTION_EXTERNAL_ID_VOD__C  
where (s.SUGGESTION_EXTERNAL_ID_VOD__C is NULL) 
or (s.SUGGESTION_EXTERNAL_ID_VOD__C is not null and s.DISMISSED_VOD__C = 0)),

t(usecase_id,Record_ID,account_ID,Territory_name) as (select distinct tb1.usecase_id, tb1.NGEBOX_RECORD_ID, tb1.account_vod__c,tb4.geo_cd from (select * from com_us_alyt_ngebox.NGEBox_Suggestion_Requests_Processed  where usecase_id = '{usecase_parm}') tb1
join com_us_alyt_ngebox.METADATA_USECASE tb2 on tb1.usecase_id = tb2.USECASE_ID 
join com_us_alyt_ngebox.lkp_prod_salesforce  tb3 on tb3.Prod_Nm = tb2.Prod_Nm
join com_us_hub.ref_zip_terr tb4 
on tb1.hcp_zip_code = tb4.zip_cd and tb3.sf_cd= tb4.sf_cd and tb4.curr_ind = 'Y' and tb4.level = 'Territory' 
join (select account_vod__c, explode(split(btrim(TERRITORY_VOD__C,';'),';')) as abc from com_us_alyt_omnichannel.ib_ilay_rep_account_territory_loader_v where ISDELETED <> 1) tb5
on tb1.account_vod__c = tb5.account_vod__c and tb4.geo_cd = tb5.abc
left join com_us_lake.rep_suggestion_vod__c tb6
on tb1.SUGGEST_EXTERNAL_ID_VOD__C = tb6.SUGGESTION_EXTERNAL_ID_VOD__C
where tb1.usecase_id = '{usecase_parm}' and ((tb6.SUGGESTION_EXTERNAL_ID_VOD__C is NULL) 
or (tb6.SUGGESTION_EXTERNAL_ID_VOD__C is not null and tb6.DISMISSED_VOD__C = 0))
order by tb1.NGEBOX_RECORD_ID,  tb4.geo_cd),

p(usecase_id,NG_RECORD_ID,account_vod__c,SUGGEST_EXTERNAL_ID_VOD__C ) AS (select  process_table.usecase_id,process_table.NGEBOX_RECORD_ID, process_table.account_vod__c,process_table.SUGGEST_EXTERNAL_ID_VOD__C from (select * from com_us_alyt_ngebox.NGEBox_Suggestion_Requests_Processed  where usecase_id = '{usecase_parm}') process_table
where process_table.usecase_id = '{usecase_parm}'
and (case when '{usecase_parm}' IN ('514') then (CAST(process_table.created_date as DATE) = current_date) else CAST(process_table.created_date as DATE) >= date_add(current_date, -(select m.hist_retention from m where m.usecase_id = '{usecase_parm}')) end)
and process_table.NGEBox_Record_ID not in (select Record_ID from t where usecase_id = '{usecase_parm}')
and process_table.SUGGEST_EXTERNAL_ID_VOD__C in (select SUGGEST_EXTERNAL_ID_VOD__C from s where s.usecase_id ='{usecase_parm}' ))

merge into com_us_alyt_ngebox.NGEBox_Suggestion_Requests_Processed process_table using (select distinct usecase_id,NG_RECORD_ID from p where usecase_id = '{usecase_parm}') source_table On (process_table.usecase_id = '{usecase_parm}' and process_table.NGEBOX_RECORD_ID = source_table.NG_RECORD_ID )
when matched then update set process_table.Not_Sent_To_Veeva_Reason = (case when ISNULL(process_table.Not_Sent_To_Veeva_Reason)= true then 'Missing ATL in iEngage' else concat(process_table.Not_Sent_To_Veeva_Reason, '|', 'Missing ATL in iEngage' ) end)"""
        self.query = query  

  def run_query_and_set_sqldf(self, usecase_id):
      self.usecase_id = usecase_id
      query_list = self.query.split(":")
      self.sql_df = [] 
      for curr_query in query_list:
          if curr_query.strip() == "":
              continue
          curr_query_with_param = curr_query.replace("Usecase_parm", usecase_id)
          sql_df = sqlContext.sql(curr_query_with_param).collect()
          self.sql_df.extend(sql_df)
    
  def format_sqldf_to_json(self):
    sql_output_rowlist = []
    for curr_row in self.sql_df: 
      sql_output_rowlist.append (curr_row.asDict(True))
    sql_df_new = pd.DataFrame(sql_output_rowlist)
    sql_jsonstr = sql_df_new.to_json(orient="records") # type(result) -> str
    sql_jsonparse = json.loads(sql_jsonstr) # type(parsed_json) -> list
    nb_json = {"data":sql_jsonparse} # type(metadata_json) -> dict
    self.nb_json = nb_json
    
  def construct_microservice_response(self):
    resp = {
      "nge_response": {
        "status": 200,
        "body": self.nb_json
      }
    }
    self.response = resp

In [0]:
usecase_id = dbutils.widgets.get("usecase_id")
nb_obj = Validate_ATL()
nb_obj.set_query_string(usecase_parm=usecase_id)
nb_obj.run_query_and_set_sqldf(usecase_id=usecase_id)
nb_obj.format_sqldf_to_json()
nb_obj.construct_microservice_response()
dbutils.notebook.exit(nb_obj.response)