<a href="https://colab.research.google.com/github/jofefer/jofefer/blob/main/creditsuisse_spark_execise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.uvigo.es/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
!tar xf spark-3.2.1-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install py4j
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip



# Importamos variables de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"

!pip install pyspark

Collecting py4j
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[?25l[K     |█▋                              | 10 kB 18.3 MB/s eta 0:00:01[K     |███▎                            | 20 kB 22.5 MB/s eta 0:00:01[K     |█████                           | 30 kB 19.8 MB/s eta 0:00:01[K     |██████▋                         | 40 kB 13.3 MB/s eta 0:00:01[K     |████████▎                       | 51 kB 5.6 MB/s eta 0:00:01[K     |█████████▉                      | 61 kB 6.5 MB/s eta 0:00:01[K     |███████████▌                    | 71 kB 7.3 MB/s eta 0:00:01[K     |█████████████▏                  | 81 kB 7.8 MB/s eta 0:00:01[K     |██████████████▉                 | 92 kB 8.6 MB/s eta 0:00:01[K     |████████████████▌               | 102 kB 7.7 MB/s eta 0:00:01[K     |██████████████████▏             | 112 kB 7.7 MB/s eta 0:00:01[K     |███████████████████▊            | 122 kB 7.7 MB/s eta 0:00:01[K     |█████████████████████▍          | 133 kB 7.7 MB/s eta 0:00:01

In [117]:
from pyspark.sql import SparkSession
import findspark

findspark.init("spark-3.2.1-bin-hadoop2.7")


In [118]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [119]:
sc= spark.sparkContext

In [120]:
df = spark.read.csv('fx_rate.csv',header=True, inferSchema=True)
df.show()

+----------+-------+-------+-------+
|   fx_date|CHF_USD|EUR_USD|JPY_USD|
+----------+-------+-------+-------+
|2022-01-09|1.08667|   null|0.00871|
|2022-01-07|1.08667|1.15065|0.00871|
|2022-01-04|1.08453|1.14993|0.00856|
|2022-01-02|1.08246|1.14898|   null|
|2022-01-01|1.08345|1.14508|0.00889|
+----------+-------+-------+-------+



In [121]:
from pyspark.sql import functions as f, Window

In [122]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [123]:
max_date = df.agg({"fx_date": "max"}).collect()[0][0]
min_date = df.agg({"fx_date": "min"}).collect()[0][0]
print(max_date)

2022-01-09


In [124]:
import pandas as pd

df_dates = pd.DataFrame(pd.date_range(start=min_date, end=max_date),columns=['date'])
df_dates.head()
spark_dates = spark.createDataFrame(df_dates)
spark_dates = spark_dates.withColumn("date",f.to_date('date'))

spark_dates.show()

+----------+
|      date|
+----------+
|2022-01-01|
|2022-01-02|
|2022-01-03|
|2022-01-04|
|2022-01-05|
|2022-01-06|
|2022-01-07|
|2022-01-08|
|2022-01-09|
+----------+



In [125]:
df_join= spark_dates.join(df, spark_dates.date == df.fx_date, 'left').drop('fx_date').withColumnRenamed('date','fx_date')
df_join.show()

+----------+-------+-------+-------+
|   fx_date|CHF_USD|EUR_USD|JPY_USD|
+----------+-------+-------+-------+
|2022-01-01|1.08345|1.14508|0.00889|
|2022-01-02|1.08246|1.14898|   null|
|2022-01-03|   null|   null|   null|
|2022-01-04|1.08453|1.14993|0.00856|
|2022-01-05|   null|   null|   null|
|2022-01-06|   null|   null|   null|
|2022-01-07|1.08667|1.15065|0.00871|
|2022-01-08|   null|   null|   null|
|2022-01-09|1.08667|   null|0.00871|
+----------+-------+-------+-------+



In [126]:

window_last = Window.orderBy("fx_date")
df_2 = df_join
for col in df.columns[1:]:
  df_2 = df_2.withColumn(col, f.last(col, ignorenulls=True).over(window_last))


# lets assume that all the exanges are with USD so always is XXX_USD
df_2 = df_2.toDF(*(c.replace('_USD', '') for c in df.columns))
df_2.show()


+----------+-------+-------+-------+
|   fx_date|    CHF|    EUR|    JPY|
+----------+-------+-------+-------+
|2022-01-01|1.08345|1.14508|0.00889|
|2022-01-02|1.08246|1.14898|0.00889|
|2022-01-03|1.08246|1.14898|0.00889|
|2022-01-04|1.08453|1.14993|0.00856|
|2022-01-05|1.08453|1.14993|0.00856|
|2022-01-06|1.08453|1.14993|0.00856|
|2022-01-07|1.08667|1.15065|0.00871|
|2022-01-08|1.08667|1.15065|0.00871|
|2022-01-09|1.08667|1.15065|0.00871|
+----------+-------+-------+-------+



For each customer we would like to know the total balance that they own in all their accounts (i.e. the
sum of all the balances) in USD (eventually converted using today's exchange rate).
Task: create a dataframe with two columns: customer_id and tot_USD_balance containing the
requested information


In [127]:
accounts = spark.read.csv('accounts.csv',header=True, inferSchema=True)
accounts.show()

+----------+-----------+--------+----------+
|account_id|customer_id|currency|   balance|
+----------+-----------+--------+----------+
|         1|         10|     EUR| 108999.21|
|         2|         10|     CHF|8910772.81|
|         3|         27|     USD|  189000.0|
|         4|        899|     JPY| 3587612.0|
|         5|         54|     USD|   87688.0|
|         6|         23|     CHF| 124000.49|
|         8|         89|     EUR|  34559.89|
|         9|        190|     EUR| 458089.77|
|        10|        190|     USD|       0.0|
+----------+-----------+--------+----------+



In [128]:
df_2.createOrReplaceTempView("rates")

In [129]:
print(max_date)
current_fx_pd = spark.sql(f"SELECT * FROM rates WHERE fx_date ='{max_date}'").drop('fx_date').toPandas()
current_fx_pd['USD'] = 1
current_fx_pd.head()

2022-01-09


Unnamed: 0,CHF,EUR,JPY,USD
0,1.08667,1.15065,0.00871,1


In [130]:
#Traspose the table

current_fx_pd = current_fx_pd.T
current_fx_pd.reset_index(level=0, inplace=True)
current_fx_pd.head()

current_fx = spark.createDataFrame(current_fx_pd)
current_fx = current_fx.withColumnRenamed('index','currency_name')
current_fx = current_fx.withColumnRenamed('0','rate')
current_fx.show()

+-------------+-------+
|currency_name|   rate|
+-------------+-------+
|          CHF|1.08667|
|          EUR|1.15065|
|          JPY|0.00871|
|          USD|    1.0|
+-------------+-------+



In [131]:
account_joined = accounts.join(current_fx, accounts.currency == current_fx.currency_name, 'left')
account_joined.show()

+----------+-----------+--------+----------+-------------+-------+
|account_id|customer_id|currency|   balance|currency_name|   rate|
+----------+-----------+--------+----------+-------------+-------+
|         2|         10|     CHF|8910772.81|          CHF|1.08667|
|         6|         23|     CHF| 124000.49|          CHF|1.08667|
|         1|         10|     EUR| 108999.21|          EUR|1.15065|
|         8|         89|     EUR|  34559.89|          EUR|1.15065|
|         9|        190|     EUR| 458089.77|          EUR|1.15065|
|         4|        899|     JPY| 3587612.0|          JPY|0.00871|
|         3|         27|     USD|  189000.0|          USD|    1.0|
|         5|         54|     USD|   87688.0|          USD|    1.0|
|        10|        190|     USD|       0.0|          USD|    1.0|
+----------+-----------+--------+----------+-------------+-------+



In [132]:
account_joined.createOrReplaceTempView("accounts")

In [106]:
spark.sql("""

select customer_id, sum(balance * rate) as usd_balance
from accounts
group by customer_id


""").show()

+-----------+------------------+
|customer_id|       usd_balance|
+-----------+------------------+
|         27|          189000.0|
|        190|    527100.9938505|
|         54|           87688.0|
|         23|    134747.6124683|
|         10|   9808489.4304292|
|        899|31248.100520000004|
|         89|39766.337428499995|
+-----------+------------------+



In [133]:
# THIS is if we dont add USD as 1 in the conversion

spark.sql("""
with fix_balance as 
(
  select * , case when currency = 'USD' then balance
            else balance * rate
            END as usd_balance
  from accounts 
)
select customer_id, round(sum(usd_balance),2) as usd_balance
from fix_balance
group by customer_id

""").show()

+-----------+-----------+
|customer_id|usd_balance|
+-----------+-----------+
|         27|   189000.0|
|        190|  527100.99|
|         54|    87688.0|
|         23|  134747.61|
|         10| 9808489.43|
|        899|    31248.1|
|         89|   39766.34|
+-----------+-----------+



Exercise 3 (bonus):

For each customer we would like to know the total balance that they own in all their accounts (i.e. the
sum of all the balances) in their local currency (eventually converted using today's exchange rate).

In [134]:
customers = spark.read.csv('customers.csv',header=True, inferSchema=True)
customers.show()

+-----------+------+---------+----------+-------------+
|customer_id|  name|  surname|country_id|date_of_birth|
+-----------+------+---------+----------+-------------+
|          1|  Enzo|  Ferrari|        IT|   1980-01-01|
|         27|  John|      Doe|        US|   1990-01-01|
|          3|George|    Black|        UK|   1970-01-01|
|         23|Alvaro|  Sanchez|        JP|   1964-01-01|
|         54| Louis|   Dupont|        FR|   1980-01-01|
|          6|  John|    White|        US|   1977-01-01|
|        190|Thomas|Schneider|        CH|   1978-01-01|
|         10| Ramon|   Blanco|        ES|   1983-01-01|
|        899|  Emma|  Laurent|        FR|   1985-01-01|
|         89|Oliver|     Lahm|        DE|   1950-01-01|
+-----------+------+---------+----------+-------------+



In [135]:
dict_currency = spark.read.csv('dict_currency.csv',header=True, inferSchema=True)
dict_currency.show()

+-------+---------+
|country|local_cur|
+-------+---------+
|     IT|      EUR|
|     FR|      EUR|
|     DE|      EUR|
|     ES|      EUR|
|     CH|      CHF|
|     US|      USD|
|     JP|      JPY|
+-------+---------+



In [143]:
acc_cust = accounts.join(customers, accounts.customer_id == customers.customer_id, 'left').drop(customers.customer_id)
acc_local = acc_cust.join(dict_currency, acc_cust.country_id == dict_currency.country, 'left').drop(dict_currency.country)
acc_fx = acc_local.join(current_fx, acc_local.currency == current_fx.currency_name, 'left').drop(current_fx.currency_name).withColumnRenamed('rate','usd_rate')
acc_fx_local = acc_fx.join(current_fx, acc_fx.local_cur == current_fx.currency_name, 'left').drop(current_fx.currency_name).withColumnRenamed('rate','usd_local')\
              .withColumn('usd_balance', f.col('balance') * f.col('usd_rate'))\
              .withColumn('local_balance', f.col('usd_balance') / f.col('usd_local'))
                    
acc_fx_local.show()

+----------+-----------+--------+----------+------+---------+----------+-------------+---------+--------+---------+------------------+--------------------+
|account_id|customer_id|currency|   balance|  name|  surname|country_id|date_of_birth|local_cur|usd_rate|usd_local|       usd_balance|       local_balance|
+----------+-----------+--------+----------+------+---------+----------+-------------+---------+--------+---------+------------------+--------------------+
|         9|        190|     EUR| 458089.77|Thomas|Schneider|        CH|   1978-01-01|      CHF| 1.15065|  1.08667|    527100.9938505|   485060.7763631093|
|        10|        190|     USD|       0.0|Thomas|Schneider|        CH|   1978-01-01|      CHF|     1.0|  1.08667|               0.0|                 0.0|
|         2|         10|     CHF|8910772.81| Ramon|   Blanco|        ES|   1983-01-01|      EUR| 1.08667|  1.15065|   9683069.4894427|   8415303.949457003|
|         1|         10|     EUR| 108999.21| Ramon|   Blanco|   

In [114]:
acc_fx_local.createOrReplaceTempView("fact_table")

In [145]:
spark.sql("""
SELECT customer_id, local_cur, cast(sum(local_balance) as decimal(38,2)) as tot_balance
FROM fact_table
GROUP BY customer_id, local_cur
""").show()

+-----------+---------+-----------+
|customer_id|local_cur|tot_balance|
+-----------+---------+-----------+
|        190|      CHF|  485060.78|
|         10|      EUR| 8524303.16|
|         89|      EUR|   34559.89|
|        899|      EUR|   27156.91|
|         54|      EUR|   76207.36|
|         23|      JPY|15470449.19|
|         27|      USD|  189000.00|
+-----------+---------+-----------+

