# <center> <img src="../labs/img/ITESOLogo.png" alt="ITESO" width="480" height="130"> </center>
# <center> **Departamento de Electrónica, Sistemas e Informática** </center>
---
## <center> **Big Data** </center>
---
### <center> **Spring 2025** </center>
---
### <center> **Streamer Structrure for final Proyect** </center>
### <center> **Par de Foraneos** </center>


In [None]:
import findspark
findspark.init()
import ta
from foraneos.utils_proyectofinal_copy import resample_and_aggregate
from foraneos.utils_proyectofinal_copy import SparkUtils as SpU

## Spark Session creation


In [None]:
from pyspark.sql import SparkSession

SPARK_SERVER = {'Konrad': '672e28abb623',
                'Aaron' : 'a5ab6bdab4b3'}
KAFKA_SERVER = {'Konrad': 'dee5c9cc3710:9093',
                'Aaron' : '69b1b3611d90:9093'}
current_user = 'Konrad'

spark = SparkSession.builder \
    .appName("SparkSQLStructuredStreaming-Kafka") \
    .master("spark://{}:7077".format(SPARK_SERVER[current_user])) \
    .config("spark.ui.port","4040") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.4") \
    .getOrCreate()
    
sc = spark.sparkContext

spark.conf.set("spark.sql.shuffle.partitions", "5")

## Kafka Stream creation

In [None]:
streamer_lines = []

for i in range(4):
    streamer_lines.append( spark \
                            .readStream \
                            .format("kafka") \
                            .option("kafka.bootstrap.servers", "{}".format(KAFKA_SERVER[current_user])) \
                            .option("subscribe", f"stock_topic{i}") \
                            .load()
    )


In [None]:
result_schema = SpU.generate_schema([("timestamp", "timestamp" ), 
                                              ("open", "float" ), 
                                              ("high", "float" ), 
                                              ("low", "float"),
                                              ("close", "float" ) ,
                                              ("williams_r", "float" ),  
                                              ("rsi", "float"), 
                                              ("ultimate_osc", "float"), 
                                              ("ema", "float"), 
                                              ("close_lag_1", "float" ), 
                                              ("close_lag_2", "float" ), 
                                              ("close_lag_3", "float" ), 
                                              ("close_lag_4", "float" ), 
                                              ("close_lag_5", "float" ),                                                                                 
                                              ])





## Transform binary data into string

In [None]:
from pyspark.sql.functions import col, split, window, min, max, first, last
from pyspark.sql.types import DoubleType, TimestampType
from pyspark.sql.functions import pandas_udf

streamer_df = []

for i in range(4):
             
    df = streamer_lines[i].withColumn("value_str", col("value").cast("string"))
    df = df.withColumn("split", split(col("value_str"), ","))
    df = df.withColumn("timestamp", col("split").getItem(0).cast(TimestampType())) \
           .withColumn("company", col("split").getItem(1)) \
           .withColumn("close", col("split").getItem(2).cast(DoubleType())) \
          .select("timestamp", "company","close")
          
    df.printSchema()
    aggregated_df = df \
        .withWatermark("timestamp", "5 minutes") \
        .groupBy( window("timestamp", "5 minutes")  # 5-minute tumbling window
        ) \
        .agg(
            first("value").alias("open"),
            max("value").alias("high"),
            min("value").alias("low"),
            last("value").alias("close")
        )
        
    aggregated_df_indicators = aggregated_df \
        .withColumn("williams_r", ta.momentum.WilliamsRIndicator(
                high=aggregated_df['high'], low=aggregated_df['low'], close=aggregated_df['close']
            ).williams_r()) \
        .withColumn("rsi", ta.momentum.RSIIndicator(close=aggregated_df['close']).rsi()) \
        .withColumn("ultimate_osc",  ta.momentum.UltimateOscillator(
                high=aggregated_df['high'], low=aggregated_df['low'], close=aggregated_df['close']
            ).ultimate_oscillator()) \
        .withColumn("ema", ta.trend.EMAIndicator(close=aggregated_df['close'], window=14).ema_indicator()) \
        .withColumn("close_lag_1", aggregated_df["close"].shift(1)) \
        .withColumn("close_lag_2", aggregated_df["close"].shift(2)) \
        .withColumn("close_lag_3", aggregated_df["close"].shift(3)) \
        .withColumn("close_lag_4", aggregated_df["close"].shift(4)) \
        .withColumn("close_lag_5", aggregated_df["close"].shift(5))

    streamer_df.append(aggregated_df_indicators)
    aggregated_df_indicators.printSchema()



### Watermarking to handle late arrival events

### Sink configuration

In [None]:
query = []

# for i in range(4):
#     query.append(
#         streamer_df[i] \
#         .writeStream \
#         .outputMode("append") \
#         .trigger(processingTime='120 seconds') \
#         .format("parquet") \
#         .option("path", f"/home/jovyan/notebooks/data/final_project_ParDeForaneos/output{i}/")
#         .option("checkpointLocation", f"/home/jovyan/notebooks/data/final_project_ParDeForaneos/checkpoints/stock_topic{i}") \
#         .start()
#     )

for i in range(4):
    query.append(
        streamer_df[i] \
        .writeStream \
        .outputMode("append") \
        .trigger(processingTime='30 seconds') \
        .format("console") \
        .start()
    )

    #query[i].awaitTermination(100)


In [None]:
for i in range(4):
    query[i].stop()

In [None]:
df = spark.read.parquet("/home/jovyan/notebooks/data/final_project_ParDeForaneos/output0/part-00000-cc734dfc-75e1-436c-a412-6da58b98bef5-c000.snappy.parquet")

In [None]:
df.show()


In [None]:
sc.stop()