# Company KPIs with Scala, Spark and Smart-Edgar
I am planning to use the Edgar data to determine and calculate some financial KPIs and feed these into a Neural Network.
In my prior posts I described how to use my Webservices to request and display Edgar information with the help of Python and Pandas.

In this instalment I show how we can direcly use the 'built in' Java Query functionality of Smart-Edgar from Scala in order to calculated some financial KPIs.

## Setup
We install the dependencies with the help of Maven

In [1]:
%classpath config resolver maven-public http://192.168.1.10:8081/repository/maven-public/
%%classpath add mvn 
ch.pschatzmann:smart-edgar:1.0.2
org.apache.spark:spark-sql_2.11:2.3.2


Added new repo: maven-public


I also import the relevant packages or classes

In [26]:
import ch.pschatzmann.edgar.utils.Utils
import ch.pschatzmann.edgar.reporting._
import ch.pschatzmann.edgar.reporting.company._

import java.util.Arrays


import ch.pschatzmann.edgar.utils.Utils
import ch.pschatzmann.edgar.reporting._
import ch.pschatzmann.edgar.reporting.company._
import java.util.Arrays


We also need to provide the login information to the database via system properties or environment variables:
- jdbcDriver - e.g org.postgresql.Driver
- jdbcURL - e.g. jdbc:postgresql://nuc.local:5432/edgar
- jdbcUser - e.g edgar
- jdbcPassword e.g. edgar


## No Filter
First we run the query w/o any filter to display all available data. We see that we get different forms and number of months. We potentially have duplicate information.

In [27]:
%%time
val values = new CompanyEdgarValuesDB(new CompanySelection().setTradingSymbol("AAPL"))
   .setParameterNames("NetIncomeLoss")
values.size


CPU times: user 0 ns, sys: 555 µs, total: 555 µs 
Wall Time: 698 ms



45

We can display the information in a table. Unfortunatly BeakerX can not handle the custom List types correctly so we wrap the result in an ArrayList

In [28]:
values.setFilter(new NoFilter(), false)
new java.util.ArrayList(values.toList)


In order to plot the data we need to convert our Java objects into Scala and select the relevant fields

In [41]:
import scala.collection.JavaConverters._

def plotEdgar(edgar:ICompanyInfo):TimePlot = {
    val list = new java.util.ArrayList(edgar.toList)
    val df = new java.text.SimpleDateFormat("yyyy-MM-dd")
    val xList = list.asScala.map(m => m.get("date").asInstanceOf[String]).map(ds => df.parse(ds))
    val yList = list.asScala.map(m => m.get("NetIncomeLoss").asInstanceOf[java.lang.Number])
    val plot = new TimePlot()
    plot.add(new Line { x = xList ; y = yList })
    return plot
}

plotEdgar(values)

## FilterQuarterlyCumulated
This filter provides only the valid cumulated quarterly values. We also filter out duplicate informarmation

In [31]:
values.setFilter(new FilterQuarterlyCumulated(), true)
new java.util.ArrayList(values.toList)


In [32]:
plotEdgar(values)

<console>: 115

In [33]:
values.getTable.asInstanceOf[ch.pschatzmann.common.table.TableConsolidated].getMessages

[Replaced [2008-12-27, Apple Inc., AAPL, 0000320193, CA, CA, ELECTRONIC COMPUTERS]: 2255000000.00 -> 2255000000.00, Replaced [2009-12-26, Apple Inc., AAPL, 0000320193, CA, CA, ELECTRONIC COMPUTERS]: 3378000000.00 -> 3378000000.00, Replaced [2010-09-25, Apple Inc., AAPL, 0000320193, CA, CA, ELECTRONIC COMPUTERS]: 14013000000.00 -> 14013000000.00, Replaced [2010-12-25, Apple Inc., AAPL, 0000320193, CA, CA, ELECTRONIC COMPUTERS]: 6004000000.00 -> 6004000000.00, Replaced [2010-12-25, Apple Inc., AAPL, 0000320193, CA, CA, ELECTRONIC COMPUTERS]: 6004000000.00 -> 6004000000.00, Replaced [2011-09-24, Apple Inc., AAPL, 0000320193, CA, CA, ELECTRONIC COMPUTERS]: 25922000000.00 -> 25922000000.00, Replaced [2011-12-31, Apple Inc., AAPL, 0000320193, CA, CA, ELECTRONIC COMPUTERS]: 13064000000.00 -> 13064000000.00, Replaced [2011-12-31, Apple Inc., AAPL, 0000320193, CA, CA, ELECTRONIC COMPUTERS]: 13064000000.00 -> 13064000000.00, Replaced [2012-09-29, Apple Inc., AAPL, 0000320193, CA, CA, ELECTRONIC 

## FilterYearly
This filter provides only the valid 10-K filings.

In [34]:
values.setFilter(new FilterYearly())
new java.util.ArrayList(values.toList)


In [35]:
plotEdgar(values)

<console>: 115

We can convert the data to html...

In [36]:
new MIMEContainer("text/html", values.toHtml)

date,companyName,tradingSymbol,identifier,incorporation,location,sicDescription,NetIncomeLoss
,,,,,,,
2007-09-29,Apple Inc.,AAPL,320193.0,CA,CA,ELECTRONIC COMPUTERS,3495000000.0
2008-09-27,Apple Inc.,AAPL,320193.0,CA,CA,ELECTRONIC COMPUTERS,6119000000.0
2009-09-26,Apple Inc.,AAPL,320193.0,CA,CA,ELECTRONIC COMPUTERS,8235000000.0
2010-09-25,Apple Inc.,AAPL,320193.0,CA,CA,ELECTRONIC COMPUTERS,14013000000.0
2011-09-24,Apple Inc.,AAPL,320193.0,CA,CA,ELECTRONIC COMPUTERS,25922000000.0
2012-09-29,Apple Inc.,AAPL,320193.0,CA,CA,ELECTRONIC COMPUTERS,41733000000.0
2013-09-28,Apple Inc.,AAPL,320193.0,CA,CA,ELECTRONIC COMPUTERS,37037000000.0
2014-09-27,Apple Inc.,AAPL,320193.0,CA,CA,ELECTRONIC COMPUTERS,39510000000.0
2015-09-26,Apple Inc.,AAPL,320193.0,CA,CA,ELECTRONIC COMPUTERS,53394000000.0


... or data to csv

In [37]:
values.toCsv

date;companyName;tradingSymbol;identifier;incorporation;location;sicDescription;NetIncomeLoss
2007-09-29;Apple Inc.;AAPL;0000320193;CA;CA;ELECTRONIC COMPUTERS;3495000000.00
2008-09-27;Apple Inc.;AAPL;0000320193;CA;CA;ELECTRONIC COMPUTERS;6119000000.00
2009-09-26;Apple Inc.;AAPL;0000320193;CA;CA;ELECTRONIC COMPUTERS;8235000000.00
2010-09-25;Apple Inc.;AAPL;0000320193;CA;CA;ELECTRONIC COMPUTERS;14013000000.00
2011-09-24;Apple Inc.;AAPL;0000320193;CA;CA;ELECTRONIC COMPUTERS;25922000000.00
2012-09-29;Apple Inc.;AAPL;0000320193;CA;CA;ELECTRONIC COMPUTERS;41733000000.00
2013-09-28;Apple Inc.;AAPL;0000320193;CA;CA;ELECTRONIC COMPUTERS;37037000000.00
2014-09-27;Apple Inc.;AAPL;0000320193;CA;CA;ELECTRONIC COMPUTERS;39510000000.00
2015-09-26;Apple Inc.;AAPL;0000320193;CA;CA;ELECTRONIC COMPUTERS;53394000000.00
2016-09-24;Apple Inc.;AAPL;0000320193;CA;CA;ELECTRONIC COMPUTERS;45687000000.00
2017-09-30;Apple Inc.;AAPL;0000320193;CA;CA;ELECTRONIC COMPUTERS;48351000000.00
2018-09-29;Apple 

## FilterQuarterValues
This filter only provides the values for 3 months

In [38]:
values.setFilter(new FilterQuarterValues())
new java.util.ArrayList(values.toList)


<console>: 114

In [14]:
plotEdgar(values)

<console>: 98

## Search for Companies
We can also search for companies by indicating the search field name and the corresponding search values. We can also filter out all companies without a ticker symbol:

In [15]:
%%time
val companies = new CompanySearch("companyName","A%","B%").onlyCompaniesWithTradingSymbol(true)

new java.util.ArrayList(companies.toList)

CPU times: user 0 ns, sys: 311 µs, total: 311 µs 
Wall Time: 1 s



or we can just select all companies

In [16]:
%%time
val companies = new CompanySearch().onlyCompaniesWithTradingSymbol(false)

new java.util.ArrayList(companies.toList)

CPU times: user 0 ns, sys: 248 µs, total: 248 µs 
Wall Time: 1 s



## Available Parameters
In order to get a better overview of the available parameters which are available in Edgar for AAPL we can create the following query: we show the count of parameterName by form:

In [17]:
val model = new EdgarModel().create()
model.getTableField("values", "unitref").setFilterValues(Arrays.asList("USD"))
model.getTableField("values", "segment").setFilterValues(Arrays.asList(""))
model.getTableField("values", "segmentdimension").setFilterValues(Arrays.asList(""))

val table = new Table()
val valueField = model.getTable("values").getValueField().asInstanceOf[ValueField]
valueField.setSelectedFunction("COUNT(%fld)")                                
table.setValueField(valueField)
table.addRow(model.getNavigationField("company", "tradingSymbol").setFilterValues("AAPL"))
table.addRow(model.getNavigationField("values", "parameterName"))
table.addColumn(model.getNavigationField("values", "form"))
table.execute(model)

new java.util.ArrayList(table.toList)

## Using Spark to calculate KPIs
Finally we demonstrate how different KPIs can be easily calclated with the help of Spark. 
We create and start a SparkSession with the help of the BeakerX Spark magic.

In [18]:
%%spark --start
val spark = SparkSession.builder()
    .appName("Edgar")
    .master("local[*]")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.ui.enabled", "false")


## Loading Relevant Parameters
We avoid the complexity around the conversion of java to scala objects and just write the Edgar data to a temporary csv file which is then used by Spark. 

We use the parameter filter to select the relevant parameters only:

In [19]:
%%time
import spark.implicits._
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
import ch.pschatzmann.edgar.reporting.company._
import ch.pschatzmann.edgar.utils.Utils

val values = new CompanyEdgarValues(new CompanySelection().setTradingSymbol("AAPL"))
    .setFilter(new FilterYearly())
    .setParameterNames("NetIncomeLoss", "OperatingIncomeLoss", "ResearchAndDevelopmentExpense",
        "CashAndCashEquivalentsAtCarryingValue", "AvailableForSaleSecuritiesCurrent", "AccountsReceivableNetCurrent",
        "Revenues", "SalesRevenueNet", "InventoryNet", "AssetsCurrent", "LiabilitiesCurrent", "Assets",
        "EarningsPerShareBasic", "StockholdersEquity")

val file = Utils.createTempFile(values.toCsv());
println(file)

val edgarAAPLYear = spark.read.format("csv")
    .option("delimiter", ";")
    .option("header", "true")
    .load(file.getAbsolutePath)

edgarAAPLYear.display(1000)

CPU times: user 0 ns, sys: 254 µs, total: 254 µs 
Wall Time: 155 ms



<console>: 111

## KPIs using Formulas
In Spark we can add additional columns by defining a formula with the help of the expr function.

We add some calculated KPIs to the dataframe:

In [20]:
%%time
// Current ratio = current assets/current liabilities
// Quick ratio = cash + Account Receivables + Short Term investments  / current liabilities
// Net profit margin = net profit/total revenue
// Inventory Turnover = Sales / Inventory
var edgarAAPLYear1 = edgarAAPLYear
    .withColumn("Revenue",expr("coalesce(Revenues, SalesRevenueNet)"))
    .withColumn("QuickRatio",expr("(CashAndCashEquivalentsAtCarryingValue + AccountsReceivableNetCurrent + AvailableForSaleSecuritiesCurrent) / LiabilitiesCurrent"))
    .withColumn("CurrentRatio",expr("AssetsCurrent / LiabilitiesCurrent"))
    .withColumn("InventoryTurnover",expr("Revenue / InventoryNet"))
    .withColumn("NetProfitMargin",expr("NetIncomeLoss / Revenue"))
    .withColumn("SalesResearchRatio%",expr("ResearchAndDevelopmentExpense / Revenue *100"))
    .withColumn("NetIncomeResearchRatio%",expr("ResearchAndDevelopmentExpense / NetIncomeLoss * 100"))
    .drop($"Revenues")
    .drop($"SalesRevenueNet")

edgarAAPLYear1.display(100)


CPU times: user 0 ns, sys: 296 µs, total: 296 µs 
Wall Time: 137 ms



<console>: 108

## KPIs for % Change 
We can also calculate the percent change compared with the prior period with the help of the lag function. We do this for 
- NetIncomeLoss
- Revenue
- ResearchAndDevelopmentExpense

In [21]:
%%time
val windowSpec = Window.partitionBy("companyName").orderBy("date")
val netIncomeChangeFormula = ($"NetIncomeLoss" - lag("NetIncomeLoss", 1).over(windowSpec)) / lag("NetIncomeLoss", 1).over(windowSpec) * 100
val salesChangeFormula = ($"Revenue" - lag("Revenue", 1).over(windowSpec)) / lag("Revenue", 1).over(windowSpec) * 100
val researchChangeFormula = ($"ResearchAndDevelopmentExpense" - lag("ResearchAndDevelopmentExpense", 1).over(windowSpec)) / lag("ResearchAndDevelopmentExpense", 1).over(windowSpec) * 100

val edgarAAPLYear2 = edgarAAPLYear1
    .withColumn("NetIncomeChange%", netIncomeChangeFormula )  
    .withColumn("RevenueChange%", salesChangeFormula )  
    .withColumn("ResearchAndDevelopmentChange%", researchChangeFormula )

edgarAAPLYear2.display(100)



CPU times: user 0 ns, sys: 232 µs, total: 232 µs 
Wall Time: 134 ms



<console>: 104

## Summary
Finally here we have the consolidated example that is wrapping the logic which has been explained above in a simple Object so that we can reuse the functionlity for different ticker symbols:

In [22]:
%%time
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row

object EdgarCompanyData {
    
    def getDataset(ticker:String):Dataset[Row] = {    
        // Edgar data selection
        val values = new CompanyEdgarValues(new CompanySelection().setTradingSymbol(ticker))
            .setFilter(new FilterYearly())
            .setParameterNames("NetIncomeLoss","OperatingIncomeLoss","ResearchAndDevelopmentExpense",
                "CashAndCashEquivalentsAtCarryingValue","AvailableForSaleSecuritiesCurrent","AccountsReceivableNetCurrent",
                "Revenues","SalesRevenueNet","InventoryNet","AssetsCurrent","LiabilitiesCurrent","Assets","EarningsPerShareBasic",
                "StockholdersEquity")

        // formulas
        val file = Utils.createTempFile(values.toCsv());
        val windowSpec = Window.partitionBy("identifier").orderBy("date")
        val netIncomeChangeFormula = ($"NetIncomeLoss" - lag("NetIncomeLoss", 1).over(windowSpec)) / lag("NetIncomeLoss", 1).over(windowSpec) * 100
        val salesChangeFormula = ($"Revenue" - lag("Revenue", 1).over(windowSpec)) / lag("Revenue", 1).over(windowSpec) * 100
        val researchChangeFormula = ($"ResearchAndDevelopmentExpense" - lag("ResearchAndDevelopmentExpense", 1).over(windowSpec)) / lag("ResearchAndDevelopmentExpense", 1).over(windowSpec) * 100

        val edgarYear = spark.read.format("csv")
            .option("delimiter", ";")
            .option("header", "true")
            .load(file.getAbsolutePath)
            .withColumn("Revenue",expr("coalesce(Revenues, SalesRevenueNet)"))
            .withColumn("QuickRatio",expr("(CashAndCashEquivalentsAtCarryingValue + AccountsReceivableNetCurrent + AvailableForSaleSecuritiesCurrent) / LiabilitiesCurrent"))
            .withColumn("CurrentRatio",expr("AssetsCurrent / LiabilitiesCurrent"))
            .withColumn("InventoryTurnover",expr("Revenue / InventoryNet"))
            .withColumn("NetProfitMargin",expr("NetIncomeLoss / Revenue"))
            .withColumn("SalesResearchRatio%",expr("ResearchAndDevelopmentExpense / Revenue *100"))
            .withColumn("NetIncomeResearchRatio%",expr("ResearchAndDevelopmentExpense / NetIncomeLoss * 100"))
            .withColumn("NetIncomeChange%", netIncomeChangeFormula )  
            .withColumn("RevenueChange%", salesChangeFormula )  
            .withColumn("ResearchAndDevelopmentChange%", researchChangeFormula )
            .drop($"Revenues")
            .drop($"SalesRevenueNet")
        
        return edgarYear
    }
}    
    
EdgarCompanyData.getDataset("AAPL").display(100)


CPU times: user 0 ns, sys: 295 µs, total: 295 µs 
Wall Time: 233 ms



<console>: 111

And now we try another ticker symbol

In [23]:
EdgarCompanyData.getDataset("MSFT").display(100)


<console>: 105