# Investment Transformations

In [None]:
# Import libraries and set SPARK_HOME
from pyspark.sql import SparkSession,Window
import os
from pyspark.sql.functions import col,lit
import pyspark.sql.functions as func
os.environ['SPARK_HOME'] = '/Users/mehmet.senturk/opt/miniconda3/envs/investment_insights/lib/python3.7/site-packages/pyspark'

In [None]:
# Create a spark session
spark = SparkSession.builder.appName("Investment Transformations").getOrCreate()

In [None]:
# Creating DataFrames for each parit
gold_prices_df = spark.read.parquet('../output/gold_tl_parity_stream-*.parquet')
dollar_prices_df = spark.read.parquet('../output/dollar_tl_parity_stream-*.parquet')
euro_prices_df = spark.read.parquet('../output/euro_tl_parity_stream-*.parquet')

In [None]:
gold_prices_df.show()

In [None]:
dollar_prices_df = dollar_prices_df.select('date','parity','buying_price',func.lead('date',1,'9999-12-31').over(Window.orderBy('date')).alias('next_date'))

In [None]:
euro_prices_df = euro_prices_df.select('date','parity','buying_price',func.lead('date',1,'9999-12-31').over(Window.orderBy('date')).alias('next_date'))

In [None]:
# To interpolate missing dates in dollar and euro data, join condition is given like below
all_instruments_df = gold_prices_df.join(
    dollar_prices_df
    ,(gold_prices_df['date'] < dollar_prices_df['next_date']) & (gold_prices_df['date'] >= dollar_prices_df['date'])
    ,'left'
).join(
euro_prices_df
    ,(gold_prices_df['date'] < euro_prices_df['next_date']) & (gold_prices_df['date'] >= euro_prices_df['date'])
    ,'left'
).select(
    gold_prices_df['date']
    ,gold_prices_df['buying_price'].alias('gold_buying_price')
    ,dollar_prices_df['buying_price'].alias('dollar_buying_price')
    ,euro_prices_df['buying_price'].alias('euro_buying_price')
)

In [None]:
investments_df = spark.read.parquet('../output/Investments-*.parquet').select(
    col('investment_instrument')
    ,func.round(col('price').cast('float'),2).alias('price')
    ,col('investment_date').cast('date')
    ,func.round(col('amount').cast('float'),2).alias('amount')
    ,func.lead('investment_date',1,'9999-12-31').over(Window.partitionBy('investment_instrument').orderBy('investment_date')).alias('next_investment_date')
    ,func.sum(col('amount') * col('price')).over(Window.partitionBy('investment_instrument').orderBy('investment_date')).alias('cumulative_amount_in_turkish_liras')
    ,func.sum('amount').over(Window.partitionBy('investment_instrument').orderBy('investment_date')).alias('cumulative_amount_in_investment_instrument')
    )

In [None]:
investments_df.show(10)

In [None]:
all_instruments_df.show(10)

In [None]:
dollar_investments_df = investments_df.where("investment_instrument = 'Dollar'")
euro_investments_df = investments_df.where("investment_instrument = 'Euro'")
gold_investments_df = investments_df.where("investment_instrument = 'Gold'")

In [None]:
gold_investments_df.collect()

In [None]:
final_df = all_instruments_df.alias('main').join(
    dollar_investments_df.alias('dollar')
    ,(col('main.date') < col('dollar.next_investment_date')) & (col('main.date') >= col('dollar.investment_date'))
    ,'left'
).join(
    euro_investments_df.alias('euro')
    ,(col('main.date') < col('euro.next_investment_date')) & (col('main.date') >= col('euro.investment_date'))
    ,'left'
).join(
    gold_investments_df.alias('gold')
    ,(col('main.date') < col('gold.next_investment_date')) & (col('main.date') >= col('gold.investment_date'))
    ,'left'
).select(
    'date'
    ,'gold_buying_price'
    ,'dollar_buying_price'
    ,'euro_buying_price'
    ,func.coalesce(col('dollar.cumulative_amount_in_turkish_liras'),lit(0)).alias('dollar_cumulative_amount_in_tl')
    ,func.coalesce(col('dollar.cumulative_amount_in_investment_instrument'),lit(0)).alias('dollar_cumulative_amount_in_investment_instrument')
    ,func.coalesce((col('dollar.cumulative_amount_in_investment_instrument') * col('dollar_buying_price')),lit(0)).alias('value_of_dollar_investment_in_tl')
    ,func.coalesce(col('euro.cumulative_amount_in_turkish_liras'),lit(0)).alias('euro_cumulative_amount_in_tl')
    ,func.coalesce(col('euro.cumulative_amount_in_investment_instrument'),lit(0)).alias('euro_cumulative_amount_in_investment_instrument')
    ,func.coalesce((col('euro.cumulative_amount_in_investment_instrument') * col('euro_buying_price')),lit(0)).alias('value_of_euro_investment_in_tl')
    ,func.coalesce(col('gold.cumulative_amount_in_turkish_liras'),lit(0)).alias('gold_cumulative_amount_in_tl')
    ,func.coalesce(col('gold.cumulative_amount_in_investment_instrument'),lit(0)).alias('gold_cumulative_amount_in_investment_instrument')
    ,func.coalesce((col('gold.cumulative_amount_in_investment_instrument') * col('gold_buying_price')),lit(0)).alias('value_of_gold_investment_in_tl')
)

In [None]:
final_df.createOrReplaceTempView('final')

In [None]:
final_df.orderBy('date',ascending = False).limit(10).show() 

In [None]:
final_df.where("date = '2023-01-21'").select(
    (col('value_of_dollar_investment_in_tl') - col('dollar_cumulative_amount_in_tl')).alias('dollar_revenue')
    ,(col('value_of_euro_investment_in_tl') - col('euro_cumulative_amount_in_tl')).alias('euro_revenue')
    ,(col('value_of_gold_investment_in_tl') - col('gold_cumulative_amount_in_tl')).alias('gold_revenue')
).select(
    '*'
    ,(col('dollar_revenue') + col('euro_revenue') + col('gold_revenue')).alias('total_revenue')
).show()

In [None]:
final_df.where("date = '2023-01-21'").rdd.take(1)[0].asDict()