In [1]:
from pyspark.sql import SparkSession
import os, platform, sys

In [2]:

def getSpark():
    machine = os.getenv("HOSTNAME")
    keytab = os.getenv("KEYTAB")
    principal = os.getenv("PRINCIPAL")
    os.environ["PYSPARK_PYTHON"] = "./environment/bin/python"
 
    #Platform specific settings
    if platform.system() == "Windows":
        krb5 = os.getenv("KRB5_CONFIG_JAVA")
    else:
        krb5 = os.getenv("KRB5_CONFIG")
         
    #Python version settings, this allows us to target an executor environment to match our driver
    pyspark_deps = f"hdfs:///user/spark/libs/environments/pyspark{sys.version_info.major}{sys.version_info.minor}-deps/environment.tar.gz#environment"
 
    return SparkSession.builder \
        .appName(f"MagellanJupyterSpark-{machine}") \
        .enableHiveSupport() \
        .master("yarn") \
        .config("spark.yarn.keytab", keytab) \
        .config("spark.yarn.principal", principal) \
        .config("spark.driver.extraJavaOptions", f"-Djava.security.krb5.conf={krb5}") \
        .config("spark.yarn.dist.archives", pyspark_deps) \
        .config("spark.jars", "hdfs:///user/spark/libs/extensions/spark-utils/spark-utils-0.0.3-SNAPSHOT.jar") \
        .getOrCreate()

In [4]:
spark = getSpark()

# Target Data

In [8]:
df_bmw = spark.sql('SELECT * from bdddaires01p.civs_bmw_non_health limit 10')

In [48]:
# Worked 
# df_bmw.show()

In [9]:
df_cpm = spark.sql('SELECT * from bdddaires01p.civs_cpm_non_health_cases limit 10')

In [47]:
# Worked
# df_cpm.show()

In [14]:
df_fms = spark.sql('SELECT * from bdsfms01p.fms_fraudmaster_ss limit 10')

In [46]:
# Worked
# df_fms.show()

In [36]:
df_payee = spark.sql("Select * from bdsfms01p.fms_paidawaydetail_ss where edi_business_day = '2022-01-31' limit 10")

In [51]:
# Worked
# df_payee.show()

# MIMO Data

In [4]:
df_mimo = spark.sql("Select * from bddmimo01p.gofcoe_base_trans_cat where posted_date > '2022-02-01' and posted_date < '2022-03-03'  limit 10")

In [5]:
# >>>>>>> DID NOT WORK <<<<<<<
df_mimo.show()

+--------------+---------+----------------+-----------+---------------------+--------------------+--------+-----------------+-----------+-----+-------+----------------+---------------+--------+--------------------+----------------+-----------+
|transaction_id|agrmnt_id|transaction_type|     amount|transaction_narrative|     clean_narrative|brand_id|       brand_name|     stg_Id|  mcc| tier_1|          tier_2|         tier_3|fca_code|        fca_category|transaction_date|posted_date|
+--------------+---------+----------------+-----------+---------------------+--------------------+--------+-----------------+-----------+-----+-------+----------------+---------------+--------+--------------------+----------------+-----------+
|  103921898907|117612943|             EBP|    -157.91| SOPHIE LATHAM CLA...|SOPHIE LATHAM CLA...|   88164|LENDING WORKS LTD|DPC_S2_1068|20016|Finance|            Loan|               |    2001|Committed Expendi...|      2022-02-07| 2022-02-07|
|  103921902083|11761294

# Feature Bank Data

In [90]:
df_prty_sql = spark.sql("Select * from bdp_eas_uca_prd_fb.fb_prty_agrmnt_ss where edi_business_day>'2022-01-03' and edi_business_day<'2022-01-05'  limit 10")

In [91]:
# Worked
df_prty_sql.show()

+---------+---------+---------------------+-------------------+------------------------------+--------------------+---------------------+----------------------------+-----------+---------------------+-------------------+----------------+
|  PRTY_ID|AGRMNT_ID|PRTY_AGRMNT_ROLE_CODE|AGRMNT_SUBTYPE_CODE|AGRMNT_XREF_SOURCE_SYSTEM_CODE|AGRMNT_BRANCH_NUMBER|AGRMNT_ACCOUNT_NUMBER|PRTY_XREF_SOURCE_DOMAIN_NAME|CUSTOMER_ID|AGRMNT_UNCLRD_BAL_AMT|AGRMNT_LDGR_BAL_AMT|edi_business_day|
+---------+---------+---------------------+-------------------+------------------------------+--------------------+---------------------+----------------------------+-----------+---------------------+-------------------+----------------+
|791867878|101514748|                  OWN|                MTA|                           ADB|          600118    |             46018719|                         CIN| 1547962586|                 null|             -42.28|      2022-01-04|
|840498174| 90100984|                  AGC|     

In [18]:
df_prty_cat = spark.sql("Select * from bdp_eas_uca_prd_fb.fb_prty_cat_part_ss where edi_business_day>'2022-01-03' and edi_business_day<'2022-01-05' limit 2")

In [21]:
# Worked
# df_prty_cat.show()

In [27]:
df_agrmnt_cat = spark.sql("Select * from bdp_eas_uca_prd_fb.fb_agrmnt_cat_part_ss where edi_business_day>'2022-01-03' and edi_business_day<'2022-01-05' limit 10")

In [29]:
# Worked
# df_agrmnt_cat.show() 

# Verint Data

In [86]:
df_verint = spark.sql("Select * from bdsver01p.ver_custom_sa_metadata_hst where edi_business_day >'2021-02-09' and edi_business_day<'2021-02-15' limit 10")
# df_verint = spark.sql("Select * from bdwver01p.ver_custom_sa_metadata where edi_business_day >'2022-02-01' and edi_business_day<'2022-02-05' limit 10")

df_verint1 = spark.sql("Select * from bdscis01p.cis_vw_partitioned_fct_call_detail_end_ss where edi_business_day >'2021-01-02' and edi_business_day<'2021-01-10'  limit 10")
# df_verint1 = spark.sql("Select * from bdwcis01p.cis_vw_partitioned_fct_call_detail_end where edi_business_day >'2021-06-06' and edi_business_day<'2021-06-07'  limit 10")

df_verint2 = spark.sql("Select * from bdscis01p.cis_vw_partitioned_fct_call_detail_leg_ss where edi_business_day >'2021-01-02' and edi_business_day<'2021-01-10' limit 10")
# df_verint2 = spark.sql("Select * from bdwcis01p.cis_vw_partitioned_fct_call_detail_leg  limit 10")

In [88]:
# Not working in SH DB
# df_verint.show()

# Working using EAS_RAW DB and shows data also
# df_verint.show()

In [78]:
# Works on SH
# df_verint1.show()

# Working using EAS_RAW DB and shows data also
# df_verint1.show()

In [80]:
# Works on SH
# df_verint2.show()

# Working using EAS_RAW DB and shows data also
# df_verint2.show()

# Complaints

In [69]:
# No Access
df_complaints = spark.sql("Select * from bdspnx01p.pnx_dim_case_ss limit 10")

# As this data is present in EAS_RAW as well. Trying it from EAS_RAW FS
# df_complaints = spark.sql("Select * from bdwpnx01p.pnx_dim_case limit 10")

In [89]:
# Worked when using from SH DB
# df_complaints.show()

# Worked when using from EAS_RAW DB 
# df_complaints.show()

# Webchats/Messaging

In [5]:
df_msg = spark.sql("Select * from bddpbrc01p.intx_messaging_kv_sdes_events limit 10")

df_wb = spark.sql("Select * from bddpbrc01p.intx_webchat_kv_sdes where crt_dt > '2022-01-14' and crt_dt < '2022-01-29' limit 10")

df_wb = spark.sql("Select * from bddpbrc01p.intx_webchat_kv_transcript where crt_dt > '2022-01-14' and crt_dt < '2022-01-29' limit 10")

In [None]:
# Working
# df_msg.show()

In [6]:
# Working 
df_wb.show()

# Clicks

In [35]:
df_clicks = spark.sql("Select * from bdsabe01p.abe_hits where edi_business_day > '2022-01-22' and edi_business_day < '2022-01-28' limit 10")

In [37]:
# Working with date partition
# df_clicks.show()

In [93]:
df_prty_sql = spark.sql("Select * from bdddaires01p.test_civs_fb limit 10")

In [94]:
df_prty_sql.show()

+---------+---------------------------------+-----------------------------------------+-------------------------------------+----------------------------------+--------------------------------------+------------------------------------+----------------+
|  prty_id|all_agrmnts_all_accounts_count_v1|all_agrmnts_credit_card_accounts_count_v1|all_agrmnts_current_accounts_count_v1|all_agrmnts_loan_accounts_count_v1|all_agrmnts_mortgage_accounts_count_v1|all_agrmnts_saving_accounts_count_v1|edi_business_day|
+---------+---------------------------------+-----------------------------------------+-------------------------------------+----------------------------------+--------------------------------------+------------------------------------+----------------+
|837822921|                             null|                                     null|                                 null|                              null|                                  null|                                null|  