In [1]:
"""
06-14-202
"""

In [2]:
# load libs
import pycountry
from pyspark.sql.functions import udf, col, split, lit, to_json, struct

In [3]:
# connection strings to Azure Blob
storage_account_name = 'xxx'
storage_account_access_key = 'xxxx'
spark.conf.set('fs.azure.account.key.' + storage_account_name + '.blob.core.windows.net', storage_account_access_key)
blob_container = 'xxx'

In [4]:
"""
supply Chain History
"""

In [5]:
# setup the path
supplyChainHistoryPath = "wasbs://" + blob_container + "@" + storage_account_name + ".blob.core.windows.net/supplyChainHistory.csv"

In [6]:
# load the raw supplyChainHistory data
df_supplyChainHistory_raw = spark.read.format("csv").load(supplyChainHistoryPath, inferSchema = True, header = True)

In [7]:
# Filter Saudi (AB) companies. using expressison ( we can use split as well) 
df_SaudiCompanies = df_supplyChainHistory_raw.filter(df_supplyChainHistory_raw["TICKER_AND_EXCHANGE_CODE"].rlike(" AB "))

In [8]:
# extract EXCHANGE_CODE from RELATED_COMPANY_TICKER cloumn
df_exchange_code = df_SaudiCompanies.withColumn("EXCHANGE_CODE",  split(col("RELATED_COMPANY_TICKER"), ' ').getItem(1) )

In [9]:
# create SQL table for new supplyChainHistory table 
df_exchange_code.createOrReplaceTempView("supplyChainHistory_table")

In [10]:
"""
exchange codes
"""

In [11]:
# setup the path
MIC_TO_BB_EXCHPath = "wasbs://" + blob_container + "@" + storage_account_name + ".blob.core.windows.net/MIC_TO_BB_EXCH.csv"

In [12]:
# load exchnage data
df_ExchangeCode = spark.read.format("csv").load(MIC_TO_BB_EXCHPath,inferSchema = True, header = True)

In [13]:
# udf to extract country name using iso alpha2
from pyspark.sql.types import *
country_dict = { i.alpha_2: i.name for i in list(pycountry.countries)}
udf_get_country = udf( lambda x : country_dict.get(x, "No Country") , StringType())
#udf_get_country = udf( lambda x : pycountry.countries.get(alpha_2=x).name , StringType())

In [14]:
# get the country name, and rename the columns
df_ExchangeCode_clean = df_ExchangeCode.select(["EQUITY EXCH CODE","EQUITY EXCH NAME","Composite Code","ISO COUNTRY"])\
  .withColumnRenamed("EQUITY EXCH CODE", "EQUITY_EXCH_CODE") \
  .withColumnRenamed("EQUITY EXCH NAME", "EQUITY_EXCH_NAME") \
  .withColumnRenamed("Composite Code", "COMPOSITE_CODE") \
  .withColumnRenamed("ISO COUNTRY", "ISO_COUNTRY")

In [15]:
 df_ExchangeCode_clean =  df_ExchangeCode_clean.withColumn("COUNTRY", udf_get_country(col("ISO_COUNTRY")))

In [16]:
# create SQL table for new ExchangeCode table 
df_ExchangeCode_clean.createOrReplaceTempView("ExchangeCode_table")

In [17]:
"""
Join both tables, and genereate final result
"""

In [18]:
SaudiSupplyChain = spark.sql("""
SELECT    ID_BB_COMPANY,
          TICKER_AND_EXCHANGE_CODE,
          LONG_COMP_NAME,
          RELATED_ID_BB_COMPANY,
          RELATED_COMPANY_TICKER,
          RELATED_COMPANY_NAME,
          COUNTRY AS RELATED_COMPANY_COUNTRY,
          RELATED_COMPANY_CURRENT_MKT_STAT,
          RELATIONSHIP,
          RELATIONSHIP_YEAR,
          RELATIONSHIP_PERIOD,
          SUPPLY_CHAIN_COST_ACCOUNT_TYPE,
          RELATIONSHIP_PERIOD_END_DATE,
          RELATIONSHIP_AS_OF_DATE,
          RELATIONSHIP_AMOUNT,
          EQY_FUND_CRNCY,
          RELATIONSHIP_PERCENT,
          SOURCE_OF_RELATIONSHIP,
          ID_BB_GLOBAL,
          ID_BB_GLOBAL_COMPANY,
          RELATED_FIGI,
          RELATED_ID_BB_GLOBAL_COMPANY

  FROM supplyChainHistory_table INNER JOIN  ExchangeCode_table ON supplyChainHistory_table.EXCHANGE_CODE = ExchangeCode_table.EQUITY_EXCH_CODE
  
""")

In [19]:
# SaudiSupplyChain.show(5)

In [20]:
"""
write out Saudi supply Chain History data to a file
"""

In [21]:
# setup the path
saudi_supply_chain_Path = "wasbs://" + blob_container + "@" + storage_account_name + ".blob.core.windows.net/saudi_supply_chain_onefile.csv"

# write out as one file
SaudiSupplyChain.repartition(1).write.format("csv").save(saudi_supply_chain_Path, header = 'true', mode="overwrite")

In [22]:
"""
write out Saudi companies lookup to a file
"""

In [23]:
# setup path
SaudiCompanies_Path = "wasbs://" + blob_container + "@" + storage_account_name + ".blob.core.windows.net/saudi_companies.csv"

# create a lookup for saudi comapines
df_SaudiCompanies = df_SaudiCompanies.select('TICKER_AND_EXCHANGE_CODE', 'LONG_COMP_NAME').distinct()

# write out the file
df_SaudiCompanies.repartition(1).write.format("csv").save(SaudiCompanies_Path, header = 'true', mode="overwrite")