In [1]:
import os
import logging

from py4j.protocol import Py4JJavaError
from pyspark.errors.exceptions.base import AnalysisException
from pyspark.sql import SparkSession, types

In [2]:
landing_date = "2025-09-27"
symbol = "ADAUSDT"

PROJECT_PREFIX = os.getenv("PROJECT_PREFIX")
PROJECT_PREFIX_UNDERSCORE = os.getenv("PROJECT_PREFIX_UNDERSCORE")
DATA_LAKE_BUCKET = os.getenv("DATA_LAKE_BUCKET")
ICEBERG_LOCK_TABLE = os.getenv("ICEBERG_LOCK_TABLE")

In [3]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
spark = (
    SparkSession.builder
    .appName("Transform Job - Pattern Two")
    .master("local[*]")
    .config(
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    )
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config(
        "spark.sql.catalog.glue_catalog.catalog-impl",
        "org.apache.iceberg.aws.glue.GlueCatalog",
    )
    .config("spark.sql.catalog.glue_catalog.warehouse", f"s3://{DATA_LAKE_BUCKET}/")
    .config(
        "spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO"
    )
    .config("spark.sql.catalog.glue_catalog.lock.table", f"{ICEBERG_LOCK_TABLE}")
    .config("spark.sql.defaultCatalog", "glue_catalog")
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .config("spark.sql.session.timeZone", "UTC")
    .config("spark.sql.parquet.enableVectorizedReader", "false")
    .config("spark.sql.columnVector.offheap.enabled", "false")
    .config("spark.memory.offHeap.enabled", "false")
    .config("spark.sql.catalog.glue_catalog.read.parquet.vectorization.enabled", "false")
    .config("spark.driver.memory", "2g")
    .config("spark.driver.extraJavaOptions", "-XX:MaxDirectMemorySize=1g")
    .config("spark.sql.codegen.wholeStage", "false")
    .config(
        "spark.jars.packages",
        ",".join([
            "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1",
            "org.apache.iceberg:iceberg-aws-bundle:1.7.1",
        ])
    )
    .getOrCreate()
)

:: loading settings :: url = jar:file:/Users/anhtu/.pyenv/versions/3.11.11/envs/crypto-cloud/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/anhtu/.ivy2/cache
The jars for the packages stored in: /Users/anhtu/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.apache.iceberg#iceberg-aws-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ae39abcb-298c-4d67-ae1e-f9f1f76e991a;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.7.1 in central
	found org.apache.iceberg#iceberg-aws-bundle;1.7.1 in central
downloading https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.7.1/iceberg-spark-runtime-3.5_2.12-1.7.1.jar ...
	[SUCCESSFUL ] org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.7.1!iceberg-spark-runtime-3.5_2.12.jar (7980ms)
downloading https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/1.7.1/iceberg-aws-bundle-1.7.1.jar ...
	[SUCCESSFUL ] org.apache.iceberg#iceberg-aws-bundle;1.7.1!iceberg-aws-bundle.jar (6558ms)
:: resolution report :: r

In [5]:
def table_exists(spark: SparkSession, database: str, table: str) -> bool:
    try:
        spark.catalog.getTable(f"{database}.{table}")
        return True
    except (AnalysisException, Py4JJavaError):
        return False


def round_half_up(x, decimals=2):
    if x is None:
        return None
    factor = 10**decimals
    return float(int(x * factor + 0.5)) / factor


def calc_ema(value, state):
    if value is None:
        return None
    prev, buffer, period, k = (
        state["prev"],
        state["buffer"],
        state["period"],
        state["k"],
    )
    if prev is None:
        buffer.append(value)
        if len(buffer) == period:
            ema = sum(buffer) / len(buffer)
        else:
            ema = None
    else:
        ema = (value - prev) * k + prev

    state["prev"] = ema
    return ema


def make_ema_in_chunks(prev_ema7, prev_ema20):
    def ema_in_chunks(iterator):
        ema_configs = {
            "ema7": {
                "period": 7,
                "k": 2 / (7 + 1),
                "prev": prev_ema7,
                "buffer": [],
            },
            "ema20": {
                "period": 20,
                "k": 2 / (20 + 1),
                "prev": prev_ema20,
                "buffer": [],
            },
        }

        for pdf in iterator:
            ema7, ema20 = [], []
            for p in pdf["close_price"]:
                price = float(p)
                e7 = calc_ema(price, ema_configs["ema7"])
                ema7.append(round_half_up(e7, 4) if e7 is not None else None)
                e20 = calc_ema(price, ema_configs["ema20"])
                ema20.append(round_half_up(e20, 4) if e20 is not None else None)

            pdf["ema7"] = ema7
            pdf["ema20"] = ema20
            pdf = pdf[[*pdf.columns[:-2], "ema7", "ema20"]]
            yield pdf

    logger.info(f"Using previous EMA values: ema7={prev_ema7}, ema20={prev_ema20}")
    return ema_in_chunks

In [6]:
logger.info(f"Transforming symbol={symbol} for date={landing_date}")

INFO:__main__:Transforming symbol=ADAUSDT for date=2025-09-27


In [7]:
serving_db = f"{PROJECT_PREFIX_UNDERSCORE}_serving_db"
klines_table = "klines"
sql_stmt = f"""
select * from {serving_db}.{klines_table}
where landing_date = DATE('{landing_date}') AND symbol = '{symbol}'
"""
df_sorted = (
    spark.sql(sql_stmt)
    .coalesce(1)  # one partition, not shuffle
    .sortWithinPartitions("group_id")
)

logger.info(f"Input rows: {df_sorted.count()}")

INFO:__main__:Input rows: 96                                                    


In [8]:
schema = types.StructType(
    [
        *df_sorted.schema.fields,  # keep all original fields
        types.StructField("ema7", types.DoubleType(), True),
        types.StructField("ema20", types.DoubleType(), True),
    ]
)

In [9]:
pattern_two_table = "pattern_two"
if table_exists(spark, serving_db, pattern_two_table):
    sql_stmt = f"""
    select ema7, ema20 from {serving_db}.{pattern_two_table}
    where landing_date = date_sub(DATE('{landing_date}'), 1) AND symbol = '{symbol}'
    order by group_id desc
    limit 1
    """
    row = spark.sql(sql_stmt).first()
    prev_ema7, prev_ema20 = (row["ema7"], row["ema20"]) if row else (None, None)
else:
    prev_ema7, prev_ema20 = None, None

ema_in_chunks_with_state = make_ema_in_chunks(prev_ema7, prev_ema20)

INFO:__main__:Using previous EMA values: ema7=None, ema20=None


In [10]:
df = df_sorted.mapInPandas(ema_in_chunks_with_state, schema)

In [11]:
df.createOrReplaceTempView("temp")


df = spark.sql("""
with cte as (
    select
        *,
        case 
            when ema7 > ema20 then 'uptrend' 
            when ema7 < ema20 then 'downtrend' 
            else NULL 
        end as trend,
        LAG(open_price, 1) over(order by group_id) as open_price_prev,
        LAG(close_price, 1) over(order by group_id) as close_price_prev
    from temp
)
select
    group_id,
    group_date,
    open_time,
    open_price,
    high_price,
    low_price,
    close_price,
    volume,
    close_time,
    landing_date,
    symbol,
    ema7,
    ema20,
    trend,
    case 
        when close_price_prev < open_price_prev
            and close_price > open_price
            and open_price < close_price_prev
            and close_price > open_price_prev
            and trend = 'downtrend'
        then 'bullish engulfing'
        when close_price_prev > open_price_prev
            and close_price < open_price
            and open_price > close_price_prev
            and close_price < open_price_prev
            and trend = 'uptrend'
        then 'bearish engulfing'
        else NULL
    end as pattern
from cte
""")

logger.info(f"Output rows: {df.count()}")

INFO:__main__:Output rows: 96                                                   


In [12]:
if table_exists(spark, serving_db, pattern_two_table):
    df.writeTo(f"{serving_db}.{pattern_two_table}").overwritePartitions()
else:
    df.writeTo(f"{serving_db}.{pattern_two_table}").tableProperty(
        "format-version", "2"
    ).partitionedBy("symbol", "landing_date").createOrReplace()

logger.info("Transform job completed successfully.")

25/11/19 23:08:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/19 23:08:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
INFO:__main__:Transform job completed successfully.                             


In [13]:
serving_db = f"{PROJECT_PREFIX_UNDERSCORE}_serving_db"
pattern_two_table = "pattern_two"
spark.sql(f"""
select * from {serving_db}.{pattern_two_table} where pattern is not null
""").show(20, truncate=False)

                                                                                

+--------+-------------------+----------------+----------+----------+---------+-----------+--------+----------------+------------+-------+------+------+---------+-----------------+
|group_id|group_date         |open_time       |open_price|high_price|low_price|close_price|volume  |close_time      |landing_date|symbol |ema7  |ema20 |trend    |pattern          |
+--------+-------------------+----------------+----------+----------+---------+-----------+--------+----------------+------------+-------+------+------+---------+-----------------+
|1954413 |2025-09-27 11:15:00|1758971700405060|0.7837    |0.7853    |0.7837   |0.785      |121594.4|1758972591323187|2025-09-27  |ADAUSDT|0.7835|0.784 |downtrend|bullish engulfing|
|1954445 |2025-09-27 19:15:00|1759000502967967|0.7803    |0.7823    |0.78     |0.7821     |95233.0 |1759001384883232|2025-09-27  |ADAUSDT|0.7808|0.7817|downtrend|bullish engulfing|
+--------+-------------------+----------------+----------+----------+---------+-----------+----

In [14]:
spark.sql(f"""
select count(*) from {serving_db}.{pattern_two_table}
""").show()

+--------+
|count(1)|
+--------+
|      96|
+--------+

