In [1]:
import os
import requests
import json
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import date
import glob

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, explode, col, from_unixtime, lit, map_from_arrays, make_date, sum, when

In [2]:
import sys, os
# Point Spark to this exact Python
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [3]:
## use correct version of python / spark
spark = (SparkSession.builder
         .master("local[*]")
         .appName("LocalPySpark")
         .config("spark.pyspark.python", sys.executable)
         .config("spark.pyspark.driver.python", sys.executable)
         .getOrCreate())

In [4]:
local_root = r"C:\Users\alexa\OneDrive\Bureau\Data Engineering\Yahoo DataBricks Pipeline"
db_root_python = "/dbfs"
db_root_spark = "dbfs:"

In [5]:
bronze_path = "/FileStore/bronze/yahoo"
silver_path = "/FileStore/silver/yahoo"
my_portfolio = ('GOOG', 'SOPH', 'PYPL', 'NOV', 'KRN', 'AMZN', 'NVDA', 'SQN', 'TGT')

# 1. call yahoo api & store bronze layer data

In [6]:
def unpivot_pandas_statement(ticker, statement):
    temp = statement.\
    reset_index().\
    melt(id_vars='index', var_name='date', value_name='value').\
    rename(columns={'index':'metric'})
    temp['ticker'] = ticker
    return temp

In [7]:
def get_raw_yahoo_financial_statements(src_root_path, ticker):
    try:
        current_ticker = yf.Ticker(ticker)
        unpivot_pandas_statement(ticker, current_ticker.get_balance_sheet()).\
            to_json(f'{src_root_path}/{ticker}_balance_sheet_{date.today()}.json', orient='records')
        unpivot_pandas_statement(ticker, current_ticker.get_income_stmt())\
            .to_json(f'{src_root_path}/{ticker}_income_stmt_sheet_{date.today()}.json', orient='records')
        unpivot_pandas_statement(ticker, current_ticker.get_cashflow()).\
            to_json(f'{src_root_path}/{ticker}_cashflow_{date.today()}.json', orient='records')

    except (requests.ConnectionError, requests.Timeout) as e:
        print(f"Connection error for {ticker}: {e}")
    except Exception as e:
        print(f"Unexpected error for {ticker}: {e}")


In [8]:
"""for stock in my_portfolio:
    print(f'processing stock {stock}')
    get_raw_yahoo_financial_statements(local_root+bronze_path, stock)"""

"for stock in my_portfolio:\n    print(f'processing stock {stock}')\n    get_raw_yahoo_financial_statements(local_root+bronze_path, stock)"

# 2 Load income statement + balance sheet + cashflows and UNION from bronze files

In [9]:
def convert_epoch_to_date(df):
    return df.withColumn('datetime', F.from_unixtime((F.col("date") / 1000)).cast(T.DateType())).drop("date")
def remove_null_values(df):
    return df.filter("value IS NOT NULL")

In [10]:
#using glob for files listing mor efficient than spark listing the directory
income_stmt_file_names = [file for file in glob.\
                    glob(os.path.\
                    join(f'{local_root}{bronze_path}', "*income_stmt_sheet*.json"))]

balance_sheet_file_names = [file for file in glob.\
                    glob(os.path.\
                    join(f'{local_root}{bronze_path}', "*balance_sheet*.json"))]

cashflow_file_names = [file for file in glob.\
                    glob(os.path.\
                    join(f'{local_root}{bronze_path}', "*cashflow*.json"))]

In [17]:
sdf_all_income_stmt = spark.read.json(income_stmt_file_names)
sdf_all_balance_sheet = spark.read.json(balance_sheet_file_names)
sdf_all_cashflow = spark.read.json(cashflow_file_names)
print(f'length of files :')
print(f'sdf_all_income_stmt : {sdf_all_income_stmt.count()}')
print(f'sdf_all_balance_sheet : {sdf_all_balance_sheet.count()}')
print(f'sdf_all_cashflow : {sdf_all_cashflow.count()}')

length of files :
sdf_all_income_stmt : 1446
sdf_all_balance_sheet : 2695
sdf_all_cashflow : 1928


In [19]:
sdf_financials = sdf_all_income_stmt.unionAll(sdf_all_balance_sheet).unionAll(sdf_all_cashflow)
sdf_financials = remove_null_values(sdf_financials)
sdf_financials = convert_epoch_to_date(sdf_financials)
print(f'sdf_all_income_stmt : {sdf_financials.count()}')

sdf_all_income_stmt : 4820


# Sélection des postes financier

In [46]:
revenue = "TotalRevenue"
cogs = "CostOfRevenue"
opex = "OperatingExpense"
da = "ReconciledDepreciation"
interest = "NetInterestIncome"
taxe = "TaxProvision"

gross_margin = "GrossProfit"
ebitda = "EBITDA"
ebit = "EBIT"
ebt = "PretaxIncome"
net_income = "NetIncome"

accounts_receivable = "AccountsReceivable"
accounts_payable = "AccountsPayable"
inventory = "Inventory"
ppe = "NetPPE"
other_asset = "OtherNonCurrentAssets" ## financial + other asset merged in yahoo
non_current_liab = "TotalNonCurrentLiabilitiesNetMinorityInterest"
current_liab = "CurrentLiabilities"
intangible_asset = "GoodwillAndOtherIntangibleAssets" 
equity = "TotalEquityGrossMinorityInterest"
cash = "CashCashEquivalentsAndShortTermInvestments"

# 3. explore forecasting methods for multiple ticker for sales

In [38]:
from pyspark.ml.regression import LinearRegression
from sklearn.linear_model import LinearRegression

In [39]:
sdf_total_revenue = sdf_financials.filter(F.col('metric') == revenue)
sdf_total_revenue = sdf_total_revenue.withColumn("year", F.year('datetime'))

In [40]:
def fit_and_forecast_pandas(pdf: pd.DataFrame) -> pd.DataFrame:
    # keep it simple: y = a + b * year
    X = pdf[["year"]]#.to_numpy()
    y = pdf["value"]#.to_numpy()

    lr = LinearRegression()
    lr.fit(X, y)

    futur_years = [year for year in range(2025, 2031)]
    X_futur = pd.DataFrame({'year' : futur_years})
    y_pred = lr.predict(X_futur)

    result = pd.DataFrame({
        "ticker": pdf["ticker"].iloc[0],
        "year": futur_years,
        "metric": revenue,
        "pred": y_pred
    })
    return result

In [41]:
sdf_sales_forecast = (
    sdf_total_revenue.groupBy("ticker")
      .applyInPandas(fit_and_forecast_pandas, schema="ticker string, year int, metric string, pred double")
)

In [42]:
sdf_sales_forecast = sdf_sales_forecast.\
    withColumn('datetime', make_date(col('year'), lit("12"), lit("31"))).\
    drop("year")

# building income statement - forecast for multiple tickers at once

In [43]:
def compute_pct_metric_to_base(df, metric, base):
    result = df.groupBy(col('ticker')).\
        agg(
            (sum(when(col('metric') == metric, col('value')).otherwise(0)) / 
             sum(when(col('metric') == base, col('value')).otherwise(0))).alias(f'avg_{metric}_to_{base}_pct')
        )
    return result

In [44]:
def compute_forecast_metrics(sales_forecast, df):
    metrics = {cogs: revenue, opex: revenue, da: revenue, interest: revenue, taxe: ebt}
    result_df = sales_forecast
    for metric, base in metrics.items():
        pct = compute_pct_metric_to_base(df, metric, base)
        forecast = sales_forecast.join(pct, on="ticker").select(
            (col('pred')*col(f'avg_{metric}_to_{base}_pct')).alias('pred'),
            lit(metric).alias('metric'),
            col('datetime'),
            col('ticker')
            )
        ### check taxe calculation, pct * ebt not sales.
        result_df = result_df.unionByName(forecast)
    return result_df

In [45]:
compute_forecast_metrics(sdf_sales_forecast, sdf_financials).groupBy(col('ticker')).count().show()

+------+-----+
|ticker|count|
+------+-----+
|   NOV|   36|
|  GOOG|   36|
|   TGT|   36|
|  PYPL|   36|
|  AMZN|   36|
|  NVDA|   36|
|  SOPH|   36|
+------+-----+



# Balance sheet analysis

In [51]:
temp = spark_df_all_balance_sheet.filter((col('ticker') == 'AMZN') & (col('datetime') == '2024-12-31'))

In [52]:
asset = temp.filter(col('metric').\
                    isin(accounts_receivable, inventory, cash, intangible_asset, other_asset, ppe)).\
                        agg(sum(col('value')).alias('total asset'))

In [53]:
payables = temp.filter(col('metric')==accounts_payable).agg(sum(col('value')).alias('payables'))
liab = temp.filter(col('metric').isin(current_liab, non_current_liab)).agg(sum(col('value')).alias('liab'))
other_liab = liab.crossJoin(payables).selectExpr("liab - payables as other_liab")
_equity = temp.filter(col('metric') == equity).agg(sum(col('value')).alias('equity'))

In [54]:
## verifiaction balance sheet balances
_equity.collect()[0][0] + other_liab.collect()[0][0] + payables.collect()[0][0] -\
      asset.collect()[0][0]

0.0

# Balance sheet forecasting

### DSO, DIO & DPO calculations 

In [None]:
## DSO = trades receivables / sales * 360
## DPO = trades payable / cogs * 360
## DIO = inventory / cogs * 360

DataFrame[ticker: string, datetime: date, metric: string, value: double, metric: string, value: double]

In [73]:
temp = spark_df_all_income_stmt.filter(col('metric').isin(cogs, revenue)).\
    groupBy("ticker", "datetime").pivot('metric').agg(F.first('value'))

spark_df_all_balance_sheet = spark_df_all_balance_sheet.join(temp, on=['ticker', 'datetime'])

# forecast balance sheet items