## ESMA and the department
- Organigramme
- What we do: examples of analysis
- What we do: Medallion architecture
- What we do: statistics

## Organigramme

## Analysis examples:
Collaborate with other departments on the data part of reports such as the Trends, Risks and Vulnerabilities (TRV):
https://www.esma.europa.eu/document/trends-risks-and-vulnerabilities-trv-report-no-2-2024

Which contains regular charts and ad-hoc analysis such as the gas derivatives:
https://www.esma.europa.eu/document/trv-article-eu-natural-gas-derivatives-markets-risks-and-trends

## Medallion architecture:
- https://www.databricks.com/glossary/medallion-architecture

# Statistics
- Example from the ECB data portal: https://data.ecb.europa.eu/data/datasets/BSI/BSI.M.U2.Y.U.A20T.A.I.U2.2250.Z01.A

**Note:** The outlier methods we will develop would fit around the two/three topics above: as alerts when producing statistics and (if time allows it) to explore the most granular golden tables to produce articles or reports.

# Intro to Databricks
- Scheduled tasks
- Pyspark and big data
- Outlier method example

## Scheduled tasks: 
Regular tasks such as maintaining the medallion architecture and the computation of statistics can (if the data behaves decently) be automated.

We need to keep that in mind for the outlier methods: so that they can be used manually (I want to explore this granular dataset) but also scheduled (after computing X, I run this and get alerts).

## Pyspark
Prefered method to work (as opposed to pandas dataframes, for example). Several reasons:
- Paralellization: Makes use of big data features such as having several clusters per worker. 
- Memory use: does not store all the data in a single 'computer' within the cluster.
- Lazy evaluation: Actions are only triggered when necessary. 

Documentation: 
  - https://spark.apache.org/docs/latest/api/python/index.html
  - https://medium.com/@john_tringham/spark-concepts-simplified-lazy-evaluation-d398891e0568


In [None]:
# some examples
import pandas as pd
import numpy as np

# pyspark already loaded when opening a notebook, but these are the useful functions we'll use most of the time
import pyspark.sql.functions as f

# first let's make a fake pandas dataframe and turn it into spark

# Define the number of rows
num_rows = 100000

# Generate random years
years = np.random.choice(range(1900, 2026), num_rows)

# Generate random countries from the list
countries = np.random.choice(['Japan', 'Nepal', 'India'], num_rows)

# Generate random values, multiplied by another random number to add noise
values = np.random.rand(num_rows) * np.random.rand(num_rows)

# Create a DataFrame
df = pd.DataFrame({
    'year': years,
    'value': values,
    'country': countries
})

df.head()

Unnamed: 0,year,value,country
0,1992,0.009401,India
1,2024,0.395331,Nepal
2,1928,0.065743,India
3,1950,0.241188,Nepal
4,2008,0.084063,Nepal


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("SparkSQL.com").getOrCreate()
dfs = spark.createDataFrame(df)

# see the type of object: pyspark connect dataframe

25/04/09 13:06:36 WARN Utils: Your hostname, Mels-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.99.64 instead (on interface en0)
25/04/09 13:06:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/09 13:06:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/09 13:06:36 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
# 'running' it just shows the schema
dfs


DataFrame[year: bigint, value: double, country: string]

In [6]:
# explaining it shows what it is: essentially an SQL query
# in this case reading a 'local' table (the pandas dataframe just created, stored in the driver)
dfs.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[year#0L,value#1,country#2]




In [7]:
# altering the dataframe does not trigger any action (lazy evaluation)

# let's add a couple of columns
dfs = dfs.withColumn('value_thousands', f.col('value')/1000)

dfs = dfs.withColumn('country_upper', f.upper('country'))

dfs

DataFrame[year: bigint, value: double, country: string, value_thousands: double, country_upper: string]

In [9]:
# the computations above are only triggered whenever they are needed
# for example, with a command such as display
dfs.sample(0.001).show()

25/04/09 13:07:08 WARN TaskSetManager: Stage 0 contains a task of very large size (2159 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+----+--------------------+-------+--------------------+-------------+
|year|               value|country|     value_thousands|country_upper|
+----+--------------------+-------+--------------------+-------------+
|1922|  0.3626450669113175|  Nepal|3.626450669113175E-4|        NEPAL|
|1972|  0.2301452471932439|  Japan|2.301452471932439E-4|        JAPAN|
|1939|  0.2241952003370295|  India|2.241952003370295E-4|        INDIA|
|2021|  0.6892271723973562|  India|6.892271723973562E-4|        INDIA|
|1929| 0.27128380141492636|  Nepal|2.712838014149264E-4|        NEPAL|
|1983| 0.19938612442183518|  Nepal|1.993861244218351...|        NEPAL|
|1942|  0.2033969439462529|  Nepal|2.033969439462529E-4|        NEPAL|
|1961|  0.3873328151303006|  Japan|3.873328151303006E-4|        JAPAN|
|1962|  0.0648391518169159|  India| 6.48391518169159E-5|        INDIA|
|1915| 0.15921067049825194|  Japan|1.592106704982519...|        JAPAN|
|1921| 0.37760778453827665|  Nepal|3.776077845382766...|        NEPAL|
|1926|

- Ideally, all the code we develop would be in Pyspark, which can also run on a local computer without being in big data.
- If that is problematic, we can also write it in Polars and then I convert it to pyspark internally. https://pola.rs/
- For example, polars with_columns works almost identically as pyspark withColumn above
https://docs.pola.rs/api/python/stable/reference/dataframe/api/polars.DataFrame.with_columns.html

- If, in certain sections, we need to use pandas (for example, for a fancier outlier model), we can optionally convert to pandas and then back to spark. This might be dangerous with big datasets but it can be an option for the users.


## Outlier method example

In [None]:
# let's run a pyspark function on it, to flag outliers
from functools import reduce #for eqnullsafe joins, but we can simplify this

def add_outlier_thresholds(data, numbercol, groupbycols: list[str]=[], showstats=False, use_logs: bool=False):
    """
    Identifies outliers in the specified column based on whether they exceed the median by
    over 3 or 4 standard deviations (of the logarithm of the absolute value).

    Paramerers
    ----------
    data: pyspark.sql.DataFrame
        The dataframe containing the data for which to compute the thresholds.
    numbercol: str
        Name of the column for which to compute the thresholds.
    groupbycols: str or list, Optional
        Name(s) of column(s) based on which to group the data to generate different
        thresholds for different groups.
    stats: bool, default False
        If True, prints a count of the number of outliers identified.
    use_logs: bool, default False
        If true, we compare the logarithm of numbercol with the thresholds.

    Returns
    -------
    pyspark.sql.DataFrame:
        The original dataframe with two additional columns, one for the 3sd metric and
        another for 4sd. The columns are populated with True if the record is identified
        as an outlier.
    """

    # new column names
    col3sd = numbercol + '_3sd'
    col4sd = numbercol + '_4sd'

    # group also by date to prevent obfuscating metrics across dates
    #groupbycols += ['AS_OF_DATE'] # unused in this example

    # compute mean and standard deviation
    stats = (data
             .groupBy(groupbycols)
             .agg(
                 f.median(f.col(numbercol).cast('float')).alias('median'),
                 f.stddev(f.col(numbercol).cast('float')).alias('stddev')
             )
             .withColumn('3sd', f.col('median') + f.col('stddev')*3 )
             .withColumn('4sd', f.col('median') + f.col('stddev')*4 ))

    # join threshold data

    if groupbycols:
        # Create a hash column based on the groupby columns in both DataFrames.
        stats_hashed = stats.select("*", f.hash(*groupbycols).alias("hash")) \
                            .dropDuplicates(["hash"])
        data_hashed = data.select("*", f.hash(*groupbycols).alias("hash"))

        # Perform a left join on the hash column.
        data = data_hashed.alias("data") \
                    .join(stats_hashed.alias("stats"), on="hash", how="left") \
                    .drop("hash")

    else: # otherwise make dummy column to join the single-row stats data
        data = (data
                .withColumn('dummykey', f.lit(1))
                .join(stats.withColumn('dummykey', f.lit(1)), 'dummykey', 'left')
                .drop('dummykey'))

    # create column for values exceeding 3 standard deviations and drop stats columns
    if use_logs:
       data = (data
            .withColumn(col3sd, f.when(f.log(f.abs(f.col(numbercol))) > f.col('3sd'),
                       f.lit(True)).otherwise(f.lit(False)) )
            .withColumn(col4sd, f.when(f.log(f.abs(f.col(numbercol))) > f.col('4sd'),
                       f.lit(True)).otherwise(f.lit(False))   )
            .drop('median', 'stddev', '3sd', '4sd'))
    else:
        data = (data
            .withColumn(col3sd, f.when(f.col(numbercol) > f.col('3sd'),
                       f.lit(True)).otherwise(f.lit(False)) )
            .withColumn(col4sd, f.when(f.col(numbercol) > f.col('4sd'),
                       f.lit(True)).otherwise(f.lit(False))   )
            .drop('median', 'stddev', '3sd', '4sd'))

    # print number of outliers detected
    if showstats:
        print(f"Number of over-3sd outliers: {data.select(col3sd).filter(f.col(col3sd) == True).count()}")
        print(f"Number of over-4sd outliers: {data.select(col4sd).filter(f.col(col4sd) == True).count()}")

    return data

In [12]:
df_test = add_outlier_thresholds(dfs, 'value', groupbycols=['country'], showstats=True)

#df_test.display()

25/04/09 13:18:13 WARN TaskSetManager: Stage 2 contains a task of very large size (2159 KiB). The maximum recommended task size is 1000 KiB.
25/04/09 13:18:13 WARN TaskSetManager: Stage 3 contains a task of very large size (2159 KiB). The maximum recommended task size is 1000 KiB.


Number of over-3sd outliers: 1213


25/04/09 13:18:14 WARN TaskSetManager: Stage 14 contains a task of very large size (2159 KiB). The maximum recommended task size is 1000 KiB.
25/04/09 13:18:14 WARN TaskSetManager: Stage 15 contains a task of very large size (2159 KiB). The maximum recommended task size is 1000 KiB.


Number of over-4sd outliers: 0


#Next steps 
- Jordi to share notebook
- Test with fake data
- Test packages (pyspark, polars), default outlier model

#Next meeting(s)
- prioritization
- introduction to prospectus https://www.esma.europa.eu/issuer-disclosure/prospectus
and its dataset https://registers.esma.europa.eu/publication/searchRegister?core=esma_registers_priii_securities

In [0]:
def melisa_outliers(data,
                    function = add_outlier_thresholds,
                    min_filter,
                    min_date):

    if function = 'random_forest_regressor':
        df = df.toPandas()

        # pandas block

        df = spark.createDaframe(df)