In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
# creating a dictionary to get all the sheets
df_dict = pd.read_excel('./data.xlsx', sheet_name=None)

In [3]:
# Using the sheet names I save the dfs and then I export them into csv file in the same directory
sheets = ['homologacion_pais','homologacion_rating','rating_empresa','rating_soberano']

for sheet in sheets:
    df = df_dict.get(sheet)
    df.to_csv('./'+sheet+'.csv', sep='|')

In [4]:
sc = SparkSession.builder.appName("PysparkDeloitte")\
.config ("spark.sql.shuffle.partitions", "50")\
.config("spark.driver.maxResultSize","5g")\
.config ("spark.sql.execution.arrow.enabled", "true")\
.getOrCreate()

In [5]:
#Creates a spark data frame called as raw_data.
#CSV FILES# 
df_hp = sc.read.csv(sheets[0]+'.csv', sep='|', header=True)
df_hr = sc.read.csv(sheets[1]+'.csv', sep='|', header=True)
df_re = sc.read.csv(sheets[2]+'.csv', sep='|', header=True)
df_rs = sc.read.csv(sheets[3]+'.csv', sep='|', header=True)

In [6]:
# rating_empresa needs to integrate pais, rating_empresa and then rating_soberano

In [7]:
df_hp.createOrReplaceTempView("H_PAIS")
df_hr.createOrReplaceTempView("H_RATING")
df_re.createOrReplaceTempView("R_EMPRESA")
df_rs.createOrReplaceTempView("R_SOBERANO")

In [8]:
# First: we integrate PAIS into the dataframe df_re that we are going to use as a master
#df_master = df_re.join(df_hp, df_re.pais_bbg == upper(df_hp.pais_bbg), "inner")

df_master = sc.sql('select rut, dv, nombre, RE.pais_bbg, upper(HP.pais) as pais, mdy, sp, fitch from R_EMPRESA RE left join H_PAIS HP on upper(RE.pais_bbg) == upper(HP.pais_bbg)')
df_master.createOrReplaceTempView("MASTER")
df_master.show()

+--------+---+--------------------+--------------------+--------------+---+----+-----+
|     rut| dv|              nombre|            pais_bbg|          pais|mdy|  sp|fitch|
+--------+---+--------------------+--------------------+--------------+---+----+-----+
|41203332|  9|INDUSTRIAL AND CO...|               CHINA|         CHINA| A1|   A|    A|
|40000177|  4|   ITAU UNIBANCO S-A|              BRAZIL|        BRASIL|Ba1| BB-|   BB|
|41001123|  9|BCO BRADESCO SA B...|              BRAZIL|        BRASIL|Ba1| BB-|   BB|
|42001513|  5|BANCO SANTANDER B...|              BRAZIL|        BRASIL|Ba1| BB-|   WD|
|41001310|  K|BCO DO BRASIL S A...|              BRAZIL|        BRASIL|Ba1| BB-|  BB-|
|41202256|  4|BANK OF CHINA HON...|               CHINA|         CHINA| A1|   A|    A|
|44000061|  4|CHINA CONSTRUCTIO...|               CHINA|         CHINA| A1|   A|    A|
|41001391|  6|CITIBANK NA ESTAD...|UNITED STATES OF ...|ESTADOS UNIDOS|Aa3|  A+|   A+|
|41001363|  0|BCO SAFRA SA CAMP...|        

In [9]:
df_master_noagg = sc.sql("select rut, dv, nombre, pais_bbg, pais, mdy as rating, 'MDY' as agencia from MASTER union all select rut, dv, nombre, pais_bbg, pais, sp as rating, 'SP' as agencia from MASTER union all select rut, dv, nombre, pais_bbg, pais, fitch as rating, 'FITCH' as agencia from MASTER")
df_master_noagg.createOrReplaceTempView("MASTER_NOAGG")
df_master_noagg.show()

+--------+---+--------------------+--------------------+--------------+------+-------+
|     rut| dv|              nombre|            pais_bbg|          pais|rating|agencia|
+--------+---+--------------------+--------------------+--------------+------+-------+
|41203332|  9|INDUSTRIAL AND CO...|               CHINA|         CHINA|    A1|    MDY|
|40000177|  4|   ITAU UNIBANCO S-A|              BRAZIL|        BRASIL|   Ba1|    MDY|
|41001123|  9|BCO BRADESCO SA B...|              BRAZIL|        BRASIL|   Ba1|    MDY|
|42001513|  5|BANCO SANTANDER B...|              BRAZIL|        BRASIL|   Ba1|    MDY|
|41001310|  K|BCO DO BRASIL S A...|              BRAZIL|        BRASIL|   Ba1|    MDY|
|41202256|  4|BANK OF CHINA HON...|               CHINA|         CHINA|    A1|    MDY|
|44000061|  4|CHINA CONSTRUCTIO...|               CHINA|         CHINA|    A1|    MDY|
|41001391|  6|CITIBANK NA ESTAD...|UNITED STATES OF ...|ESTADOS UNIDOS|   Aa3|    MDY|
|41001363|  0|BCO SAFRA SA CAMP...|        

In [10]:
df_master_noagg_on = sc.sql('select M.*, HR.orden_norma, HR.rating_norma from MASTER_NOAGG M LEFT JOIN H_RATING HR ON M.RATING=HR.RATING AND M.AGENCIA=HR.AGENCIA_HOMOL')
df_master_noagg_on.createOrReplaceTempView("MASTER_NOAGG_ON")
df_master_noagg_on.filter(df_master_noagg_on['nombre']=='MERCANTILE BANK OF ST LOUIS USA').show()

+--------+---+--------------------+--------------------+--------------+------+-------+-----------+------------+
|     rut| dv|              nombre|            pais_bbg|          pais|rating|agencia|orden_norma|rating_norma|
+--------+---+--------------------+--------------------+--------------+------+-------+-----------+------------+
|41203327|  2|MERCANTILE BANK O...|UNITED STATES OF ...|ESTADOS UNIDOS|     -|    MDY|       null|        null|
|41203327|  2|MERCANTILE BANK O...|UNITED STATES OF ...|ESTADOS UNIDOS|     -|     SP|       null|        null|
|41203327|  2|MERCANTILE BANK O...|UNITED STATES OF ...|ESTADOS UNIDOS|     -|  FITCH|       null|        null|
+--------+---+--------------------+--------------------+--------------+------+-------+-----------+------------+



In [11]:
df_master_noagg_on_ranked = sc.sql('select *, rank() OVER(PARTITION BY rut,dv,nombre,pais_bbg,pais ORDER BY orden_norma DESC) as rank from MASTER_NOAGG_ON')
df_master_noagg_on_ranked.createOrReplaceTempView("MASTER_NOAGG_ON_RANKED")
df_master_noagg_on_ranked.show()

+--------+---+--------------------+--------------------+--------------+------+-------+-----------+------------+----+
|     rut| dv|              nombre|            pais_bbg|          pais|rating|agencia|orden_norma|rating_norma|rank|
+--------+---+--------------------+--------------------+--------------+------+-------+-----------+------------+----+
|40000008|  5|SOCIETE GENERAL P...|              FRANCE|       FRANCIA|    A-|  FITCH|          7|          A-|   1|
|40000008|  5|SOCIETE GENERAL P...|              FRANCE|       FRANCIA|     A|     SP|          6|           A|   2|
|40000008|  5|SOCIETE GENERAL P...|              FRANCE|       FRANCIA|    A1|    MDY|          5|          A+|   3|
|40000016|  6|DEUTSCHE BANK AG-...|             GERMANY|      ALEMANIA|   BBB|  FITCH|          9|         BBB|   1|
|40000016|  6|DEUTSCHE BANK AG-...|             GERMANY|      ALEMANIA|  BBB+|     SP|          8|        BBB+|   2|
|40000016|  6|DEUTSCHE BANK AG-...|             GERMANY|      AL

In [12]:
df_master2 = sc.sql('select rut,dv,nombre,pais_bbg,pais,rating_norma as rating_empresa from MASTER_NOAGG_ON_RANKED where rank = 1').dropDuplicates()
df_master2.createOrReplaceTempView("MASTER2")
df_master2.show()

+--------+---+--------------------+--------------------+--------------+--------------+
|     rut| dv|              nombre|            pais_bbg|          pais|rating_empresa|
+--------+---+--------------------+--------------------+--------------+--------------+
|40000008|  5|SOCIETE GENERAL P...|              FRANCE|       FRANCIA|            A-|
|40000016|  6|DEUTSCHE BANK AG-...|             GERMANY|      ALEMANIA|           BBB|
|40000032|  8|BCO CONTINENTAL LIMA|                PERU|          PERU|          BBB+|
|40000039|  5|RAIFFEISEN ZENTRA...|             AUSTRIA|       AUSTRIA|            A-|
|40000070|  0|BBANK OF AMERICA ...|UNITED STATES OF ...|ESTADOS UNIDOS|            A+|
|40000098|  0|    BBVA Colombia SA|            COLOMBIA|      COLOMBIA|           BBB|
|40000116|  2|BCO DE CREDITO DE...|                PERU|          PERU|          BBB+|
|40000154|  5|BNP PARIBAS S.A. ...|              FRANCE|       FRANCIA|            A+|
|40000177|  4|   ITAU UNIBANCO S-A|        

In [13]:
df_rs_wu = sc.sql('select pais_bbg, REPLACE(sp,"u","") as SP, REPLACE(fitch,"u","") as FITCH, REPLACE(mdy,"u","") as MDY from R_SOBERANO')
df_rs_wu.createOrReplaceTempView("R_SOBERANO_WU")
df_rs_wu.show()

+--------------------+--------------------+--------------------+--------------------+
|            pais_bbg|                  SP|               FITCH|                 MDY|
+--------------------+--------------------+--------------------+--------------------+
|           Abu Dhabi|                  AA|                  AA|                 Aa2|
|             Albania|                  B+|                null|                  B1|
|             Algeria|                null|                null|                null|
|             Andorra|                 BBB|                null|                null|
|              Angola|                CCC+|                 CCC|                Caa1|
|           Argentina|                CCC+|                  WD|                  Ca|
|             Armenia|                null|                  B+|                 Ba3|
|               Aruba|                 BBB|                  BB|                null|
|           Australia|                 AAA|           

In [14]:
df_rs_cleaned = sc.sql('select CASE WHEN pais_bbg = "United States" THEN "United States of America" WHEN pais_bbg = "South Korea" THEN "Korea, Republic of" ELSE pais_bbg END as pais_bbg , SP, FITCH, MDY from R_SOBERANO_WU')
df_rs_cleaned.createOrReplaceTempView("R_SOBERANO_WU")
df_rs_cleaned.show()

+--------------------+--------------------+--------------------+--------------------+
|            pais_bbg|                  SP|               FITCH|                 MDY|
+--------------------+--------------------+--------------------+--------------------+
|           Abu Dhabi|                  AA|                  AA|                 Aa2|
|             Albania|                  B+|                null|                  B1|
|             Algeria|                null|                null|                null|
|             Andorra|                 BBB|                null|                null|
|              Angola|                CCC+|                 CCC|                Caa1|
|           Argentina|                CCC+|                  WD|                  Ca|
|             Armenia|                null|                  B+|                 Ba3|
|               Aruba|                 BBB|                  BB|                null|
|           Australia|                 AAA|           

In [15]:
df_rs_wu_noagg = sc.sql("select pais_bbg, SP as rating, 'SP' as agencia from R_SOBERANO_WU union all select pais_bbg, FITCH as rating, 'FITCH' as agencia from R_SOBERANO_WU union all select pais_bbg, MDY as rating, 'MDY' as agencia from R_SOBERANO_WU")
df_rs_wu_noagg.createOrReplaceTempView("R_SOBERANO_WU_NOAGG")
df_rs_wu_noagg.filter(df_rs_wu_noagg['pais_bbg']=='Chile').show()

+--------+------+-------+
|pais_bbg|rating|agencia|
+--------+------+-------+
|   Chile|     A|     SP|
|   Chile|    A-|  FITCH|
|   Chile|    A1|    MDY|
+--------+------+-------+



In [16]:
df_master3_noagg_on = sc.sql('select M.*, HR.orden_norma, HR.rating_norma from R_SOBERANO_WU_NOAGG M left join H_RATING HR ON M.RATING=HR.RATING AND M.AGENCIA=HR.AGENCIA_HOMOL')
df_master3_noagg_on.createOrReplaceTempView("MASTER3_NOAGG_ON")
df_master3_noagg_on.show()

+--------------------+--------------------+-------+-----------+------------+
|            pais_bbg|              rating|agencia|orden_norma|rating_norma|
+--------------------+--------------------+-------+-----------+------------+
|           Abu Dhabi|                  AA|     SP|          3|          AA|
|             Albania|                  B+|     SP|         14|          B+|
|             Algeria|                null|     SP|       null|        null|
|             Andorra|                 BBB|     SP|          9|         BBB|
|              Angola|                CCC+|     SP|         17|        CCC+|
|           Argentina|                CCC+|     SP|         17|        CCC+|
|             Armenia|                null|     SP|       null|        null|
|               Aruba|                 BBB|     SP|          9|         BBB|
|           Australia|                 AAA|     SP|          1|         AAA|
|             Austria|                 AA+|     SP|          2|         AA+|

In [17]:
df_master3_noagg_on_ranked = sc.sql('select *, rank() OVER(PARTITION BY pais_bbg ORDER BY orden_norma DESC) as rank from MASTER3_NOAGG_ON')
df_master3_noagg_on_ranked.createOrReplaceTempView("MASTER3_NOAGG_ON_RANKED")
df_master3_noagg_on_ranked.show()

+---------+------+-------+-----------+------------+----+
| pais_bbg|rating|agencia|orden_norma|rating_norma|rank|
+---------+------+-------+-----------+------------+----+
|Abu Dhabi|    AA|     SP|          3|          AA|   1|
|Abu Dhabi|    AA|  FITCH|          3|          AA|   1|
|Abu Dhabi|   Aa2|    MDY|          3|          AA|   1|
|  Albania|    B+|     SP|         14|          B+|   1|
|  Albania|    B1|    MDY|         14|          B+|   1|
|  Albania|  null|  FITCH|       null|        null|   3|
|  Algeria|  null|     SP|       null|        null|   1|
|  Algeria|  null|  FITCH|       null|        null|   1|
|  Algeria|  null|    MDY|       null|        null|   1|
|  Andorra|   BBB|     SP|          9|         BBB|   1|
|  Andorra|  null|  FITCH|       null|        null|   2|
|  Andorra|  null|    MDY|       null|        null|   2|
|   Angola|   CCC|  FITCH|         18|         CCC|   1|
|   Angola|  CCC+|     SP|         17|        CCC+|   2|
|   Angola|  Caa1|    MDY|     

In [22]:
df_master3 = sc.sql('select pais_bbg, rating_norma as rating_soberano from MASTER3_NOAGG_ON_RANKED where rank = 1').dropDuplicates()
df_master3.createOrReplaceTempView("RATING_SOBERANO_HOMOL")
df_master3.show()

+--------------------+---------------+
|            pais_bbg|rating_soberano|
+--------------------+---------------+
|United States of ...|            AA+|
+--------------------+---------------+



In [19]:
df_master_pro = sc.sql('select rut, dv, nombre, pais, rating_empresa, RS.rating_soberano from MASTER2 M2 left join RATING_SOBERANO_HOMOL RS on upper(M2.pais_bbg) == upper(RS.pais_bbg)')
df_master_pro.show()

+--------+---+--------------------+--------------+--------------+---------------+
|     rut| dv|              nombre|          pais|rating_empresa|rating_soberano|
+--------+---+--------------------+--------------+--------------+---------------+
|40000008|  5|SOCIETE GENERAL P...|       FRANCIA|            A-|             AA|
|40000016|  6|DEUTSCHE BANK AG-...|      ALEMANIA|           BBB|            AAA|
|40000032|  8|BCO CONTINENTAL LIMA|          PERU|          BBB+|           BBB+|
|40000039|  5|RAIFFEISEN ZENTRA...|       AUSTRIA|            A-|            AA+|
|40000070|  0|BBANK OF AMERICA ...|ESTADOS UNIDOS|            A+|            AA+|
|40000098|  0|    BBVA Colombia SA|      COLOMBIA|           BBB|            BBB|
|40000116|  2|BCO DE CREDITO DE...|          PERU|          BBB+|           BBB+|
|40000154|  5|BNP PARIBAS S.A. ...|       FRANCIA|            A+|             AA|
|40000177|  4|   ITAU UNIBANCO S-A|        BRASIL|           BB-|            BB-|
|40000315|  7|LL

In [20]:
df_master_pro.toPandas().to_csv('./master_rating.csv', sep='|')

In [21]:
sc.stop()