In [1]:
from pyspark.sql import functions
from pyspark.sql.functions import when, count, isnan, col
from datetime import datetime as dt

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.master("spark://pop-os.localdomain:7077")\
    .appName("EWMA.com")\
    .config("spark.jars", "mysql-connector-j-8.0.33.jar")\
    .config("spark.driver.memory", "8g")\
    .config("spark.executor.memory", "14g").getOrCreate()

23/11/01 17:07:08 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.61.172 instead (on interface wlo1)
23/11/01 17:07:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/11/01 17:07:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
spark

In [5]:
import configparser
config_file_path = "/opt/spark/conf/spark-config.conf"
# Read secrete from config
config = configparser.ConfigParser()
config.read(config_file_path)

['/opt/spark/conf/spark-config.conf']

In [6]:
def read_from_mysql(spark, table_name):
    dbDriver = config.get("db", "driver")
    dbUrl = config.get("db", "url")
    dbUsername = config.get("db", "username")
    dbPassword = config.get("db", "password")
    return (
        spark.read.format("jdbc")
        .option("driver", dbDriver)
        .option("url", dbUrl)
        .option("dbtable", table_name)
        .option("user", dbUsername)
        .option("password", dbPassword)
        .load()
    )

In [7]:
df = read_from_mysql(spark, "Sales")

In [8]:
df.show()

                                                                                

+-------+----------+-------+
|sale_id| sale_date|revenue|
+-------+----------+-------+
|      1|2022-01-31| 651.15|
|      2|2022-02-28| 761.09|
|      3|2022-03-31| 654.18|
|      4|2022-04-30| 987.64|
|      5|2022-05-31| 975.64|
|      6|2022-06-30| 915.30|
|      7|2022-07-31| 649.55|
|      8|2022-08-31| 501.87|
|      9|2022-09-30| 560.70|
|     10|2022-10-31| 720.50|
|     11|2022-11-30| 807.34|
|     12|2022-12-31| 980.20|
+-------+----------+-------+



In [9]:
df.createOrReplaceTempView("ewma_table")

In [10]:
ewma_query = """SELECT
  t1.sale_id,
  t1.sale_date,
  t1.revenue,
  CASE
    WHEN t2.prev_ema IS NULL THEN t1.revenue
    ELSE 0.2 * t1.revenue + (1 - 0.2) * t2.prev_ema
  END AS ewma
FROM (
  SELECT
    t1.sale_id,
    t1.sale_date,
    t1.revenue,
    t1.sale_id - 1 AS prev_id
  FROM ewma_table t1
) t1
LEFT JOIN (
  SELECT
    sale_id AS prev_id,
    0.2 * revenue AS prev_ema
  FROM ewma_table
) t2 ON t1.prev_id = t2.prev_id
ORDER BY t1.sale_date;"""


In [11]:
spark.sql(ewma_query).show()

+-------+----------+-------+--------+
|sale_id| sale_date|revenue|    ewma|
+-------+----------+-------+--------+
|      1|2022-01-31| 651.15|651.1500|
|      2|2022-02-28| 761.09|256.4020|
|      3|2022-03-31| 654.18|252.6104|
|      4|2022-04-30| 987.64|302.1968|
|      5|2022-05-31| 975.64|353.1504|
|      6|2022-06-30| 915.30|339.1624|
|      7|2022-07-31| 649.55|276.3580|
|      8|2022-08-31| 501.87|204.3020|
|      9|2022-09-30| 560.70|192.4392|
|     10|2022-10-31| 720.50|233.8120|
|     11|2022-11-30| 807.34|276.7480|
|     12|2022-12-31| 980.20|325.2144|
+-------+----------+-------+--------+



In [12]:
# 
ewma_query_lead_lag = """
SELECT
  t1.sale_id,
  t1.sale_date,
  t1.revenue,
  CASE
    WHEN t2.prev_ema IS NULL THEN t1.revenue
    ELSE 0.2 * t1.revenue + (1 - 0.2) * t2.prev_ema
  END AS ewma
FROM (
  SELECT
    t1.sale_id,
    t1.sale_date,
    t1.revenue,
    LAG(ewma) OVER (PARTITION BY sale_date ORDER BY sale_date) AS prev_ema
  FROM (
    SELECT
      t1.sale_id,
      t1.sale_date,
      t1.revenue,
      0.2 * t1.revenue AS ewma
    FROM ewma_table t1
  ) t1
) t1
LEFT JOIN (
  SELECT
    sale_id AS next_id,
    LEAD(ewma) OVER (PARTITION BY sale_date ORDER BY sale_date) AS prev_ema
  FROM (
    SELECT
      t2.sale_id,
      t2.sale_date,
      0.2 * t2.revenue AS ewma
    FROM ewma_table t2
  ) t2
) t2 ON t1.sale_id = t2.next_id
ORDER BY t1.sale_date;
"""

In [13]:
spark.sql(ewma_query_lead_lag).show()

+-------+----------+-------+--------+
|sale_id| sale_date|revenue|    ewma|
+-------+----------+-------+--------+
|      1|2022-01-31| 651.15|651.1500|
|      2|2022-02-28| 761.09|761.0900|
|      3|2022-03-31| 654.18|654.1800|
|      4|2022-04-30| 987.64|987.6400|
|      5|2022-05-31| 975.64|975.6400|
|      6|2022-06-30| 915.30|915.3000|
|      7|2022-07-31| 649.55|649.5500|
|      8|2022-08-31| 501.87|501.8700|
|      9|2022-09-30| 560.70|560.7000|
|     10|2022-10-31| 720.50|720.5000|
|     11|2022-11-30| 807.34|807.3400|
|     12|2022-12-31| 980.20|980.2000|
+-------+----------+-------+--------+



In [14]:
spark.stop()