# Stoys (beta)

This is beta version. More notebooks and documentation coming next week...

Try it in an environemnt like:

```bash
python3 -m venv env
source env/bin/activate
python3 -m pip install --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple stoys
python3 -m pip install jupyterlab
jupyter lab
```

In a nutshell: Copy the first cell and then look at a few examples of the `.stoys` extension on DataFrame.


## Init Stoys

Always create first cell as follows:


In [None]:
import stoys

stoys.init()


### Init Spark

Create a spark session as usual.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()


### Download Data


In [None]:
!mkdir -p data_cache
![ -d data_cache/covid-19-data ] || git -C data_cache clone --filter=tree:0 --no-checkout https://github.com/owid/covid-19-data.git covid-19-data
!git -C data_cache/covid-19-data checkout 0447906e7fe406e6baf4f1b8f5db58f58b1d1bca


## Load Data

You can use SparkIO like in this example. But you don''t have to. It is optional.


In [None]:
from stoys.spark.spark_io import SparkIO, SparkIOConfig

sparkIOConfig = SparkIOConfig.notebook.copy(
    input_paths=[
        'data_cache/covid-19-data/public/data/excess_mortality/excess_mortality.csv?sos-format=csv&header=true&inferSchema=true',
        'data_cache/covid-19-data/public/data/vaccinations/vaccinations.csv?sos-format=csv&header=true&inferSchema=true',
        'data_cache/covid-19-data/public/data/jhu/full_data.csv?sos-table_name=jhu_csv&sos-format=csv&header=true&inferSchema=true',
        'data_cache/covid-19-data/public/data/jhu/locations.csv?sos-format=csv&header=true&inferSchema=true',
    ],
    output_path = 'tmp/stoys_covid',
)

sparkIO = SparkIO(spark, sparkIOConfig)
spark.catalog.listTables()


## Explore


In [None]:
spark.table('excess_mortality_csv').limit(4).toPandas()


In [None]:
spark.table('vaccinations_csv').limit(4).toPandas()


## Magics - %%ssql

Or use cell magic `%%ssql` - Spark (or Stoys :)) SQL.


In [None]:
%%ssql?

### display_limit

Beware: The rows number you see from `%%ssql` may be incorrect. It is at most 1000 by default.

Let's see what happens if we disable it...


In [None]:
%%ssql --display_limit=0

SELECT * FROM excess_mortality_csv AS em JOIN vaccinations_csv AS v ON em.location = v.location


That either took long or killed you spark driver. The default limits tries to limit these issues!

Why was the join that big anyway?


## DqJoin

One can look at it with DqJoin.

BTW: There is incomplete code to discover joins and show suspicious joins automatically.


In [None]:
em_sdf = spark.table('excess_mortality_csv')
v_sdf = spark.table('vaccinations_csv')

em_sdf.stoys.dq_equi_join(v_sdf, ['location'], ['location'])


__TODO__: Speaking of DqJoin - It will also (soon) display here sample of not just keys but the whole table.
It is important because from the uniqueness of primary key rule one would quickly see the actual problem here.
`location` is not unique. Combination of `location` and `date` is.

Basically this ...

In [None]:
em_sdf.stoys.dq(rules=[stoys.spark.dq.DqRules.uniqueRule('location')])


Lets' try the join it again ...


In [None]:
%%ssql mortality_with_vaccination

SELECT * FROM excess_mortality_csv AS em FULL JOIN vaccinations_csv AS v ON em.location = v.location AND em.date = v.date


Better now. Let's actually compute something ...


In [None]:
%%ssql covid_data

SELECT
    v.location,
    v.date,
    j.new_cases AS cases,
    j.new_deaths AS deaths,
    em.deaths_2020_all_ages AS excess_deaths,
    em.average_deaths_2015_2019_all_ages AS average_deaths,
    v.daily_vaccinations AS vaccinations
FROM excess_mortality_csv AS em
FULL JOIN vaccinations_csv AS v ON em.location = v.location AND em.date = v.date
FULL JOIN jhu_csv AS j ON v.location = j.location AND v.date = j.date


## API

There are a couple of ways to use stoys - magics, python api (matching scala) and extension methods.

### Stoys magics

 1. `%%ssql` - runs sql query, register result and display a few lines from it
 2. `%%ssql_dp` - append '_dp' to the existing '%%ssql' cell and re-run it to get data profile
 3. `%%ssql_dq` - see dq result from existing query (It has to have correct form! SELECT *, boolean_expr AS rule_name, ...)
 4. `%%ssql_aggsum` - aggregate summaries rendering differently tables used for eyeballing aggregate numbers

### Python API (matching scala)

It is basically the same just without 'io.' package name prefix.

Python api can be used as follows:

```python
from stoys.spark.dq import *

rules = [DqRules.uniqueRule("location")]
Dq.fromDataFrame(em_sdf).rules(rules).dqResult()
```

It has the same pacakges, classes, methods, order of arguments. All the data structures coming in and out are the same.
Their copy methods also behaves the same as scala.

Note: You can even use `from stoys.utils.scala_compat import *` and use things like `Seq(...)` or `Map.empty`.
Such code may not be idiomatic python but it allows to copy scala code to python kernel and back. That comes handy at times.

### Extension methods

Pandas `DataFrame` and Spark `DataFrame` both have registered extension method `.stoys`. Extensions have slightly different api.
They are more concise since the DataFrame and SparkSession are already in the extended object.
The extensions also use optional arguments instead of builder pattern. We will see which api style will prove more popular.

```python
em_sdf.stoys.dq(rules=[DqRules.uniqueRule("location")])
```

Note: the extension will be added also to DataFrames from koalas and spark.pandas packages.
Note: The extensions have also the same code in scala.


## Dq - Quality

Use `DqRules` or write your own `DqRule` and use it with python api or 

In [None]:
from stoys.spark.dq import *

rules = [
    # Prefer using common rules
    DqRules.notNullRule('location'),
    # Or write a raw rule. If your rule is common consider creating your own collection of common rules.
    DqRule('cases__non_negative', '(cases IS NULL) OR (cases >= 0)', None, []),
    # They can also be read from configuration.
    DqRule.from_json('{"name": "deaths__non_negative", "expression": "(deaths IS NULL) OR (deaths >= 0)", "referenced_column_names": []}'),
]
fields = [
    DqRules.field('location', data_type_json='"string"', regexp='[A-Z][a-z]+'),
]

covid_data.stoys.dq(fields=fields, rules=rules)


## Dp - Profile

One can just use the `%%ssql_dp` magic. Or quickly append '_dp' to the existing '%%ssql' cell and re-run it.

Or use any any other api form. For eaxmple extensions.


In [None]:
spark.table('excess_mortality_csv').stoys.dp()


## AggSum - Aggregate Summaries

People are great at seeing patterns in the data. A lot of data issues are discovered by spot checking some aggregate values.
For that we have AggSum which can render those tables (hopefully) better and diff them with thresholds.


In [None]:
%%ssql_aggsum

SELECT
    location,
    SUM(cases) AS cases,
    SUM(deaths) AS deaths,
    SUM(excess_deaths) AS excess_deaths,
    SUM(average_deaths) AS average_deaths,
    SUM(vaccinations) AS vaccinations
FROM covid_data
GROUP BY location
ORDER BY location


## Diffing

As good as we are at seeing patterns we can do much better with some help. Highlighting difference is really useful.
Stoys can diff things like data profiles and aggregate summaries. They help spot regressions between two runs very quickly.


In [None]:
from IPython.core import display
from stoys.spark.dp import Dp
from stoys.ui.dp_ui import dp_result_to_html

em_by_continent_query_template = """
SELECT
  *
FROM excess_mortality_csv AS em
WHERE EXISTS (SELECT true FROM locations_csv AS l WHERE l.location = em.location AND l.continent = "{continent}")
"""

asia_em_sdf = spark.sql(em_by_continent_query_template.format(continent='Asia'))
europe_em_sdf = spark.sql(em_by_continent_query_template.format(continent='Europe'))
display.HTML(dp_result_to_html(asia_em_sdf.stoys.dp(), europe_em_sdf.stoys.dp()))


[TODO] Add better example of profile diff.

[TODO] Fix the profiler performance and improve the histogram.


We can also diff the aggregate summaries.

In [None]:
from IPython.core import display
from stoys.spark.aggsum import AggSum
from stoys.ui.aggsum_ui import aggsum_result_to_html

cd_aggsum_in_period_query_template = """
SELECT
    location,
    SUM(cases) AS cases,
    SUM(deaths) AS deaths,
    SUM(vaccinations) AS vaccinations
FROM covid_data
WHERE date BETWEEN '{start_date}' AND '{end_date}'
GROUP BY location
ORDER BY location
"""

cd_aggsum_q1_sdf = AggSum.fromSql(spark, cd_aggsum_in_period_query_template.format(start_date='2021-01-01', end_date='2021-03-31')).aggSumResult()
cd_aggsum_q2_sdf = AggSum.fromSql(spark, cd_aggsum_in_period_query_template.format(start_date='2021-04-01', end_date='2021-06-31')).aggSumResult()
display.HTML(aggsum_result_to_html(cd_aggsum_q2_sdf, cd_aggsum_q1_sdf))


## Other Stoys Tools

There are many more things in stoys library which will be documented soon(-ish):
 1. `Respahe` is perhaps the most useeful function in all of stoys. Check out the `ReshapeConfig`!
 2. Spark export capable to export millions of files templated with styles and plots in one sql query.
 3. The main components presented here can do a lot more things like type inference, dq result filtering, a many ui features.
 4. The graph component is coming. That one will integrate all the above and bring a lot more on top of that.
 5. There is a jdbc database loader, dag runner with layered configs, and number of scala utilities.
 