# AAPL Streaming Analysis with Spark SQL & Structured Streaming
This project demonstrates real-time factor generation and technical signal construction...


In [1]:
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions.udf

//Create three windows (https://spark.apache.org/docs/3.5.0/api/scala/org/apache/spark/sql/expressions/Window$.html)
val windowSpec1 = Window.orderBy(col("timestamp")) //Window for 1 period
val windowSpec3 = Window.orderBy(col("timestamp")).rowsBetween(-2, 0) //Window for three periods
val windowSpec6= Window.orderBy(col("timestamp")).rowsBetween(-5, 0) //Window for 6 periods


//Create case class for data input (the same case class will work for both static and streaming data)
case class HistData(timestamp: String,open: Double,high: Double, low: Double, close: Double)

//Write a udf function remove_colon_udf that drops the colon from a string (e.g., "9:30" becomes "930")
//(the Scala function replace will be useful here)
val remove_colon_udf= udf((s: String) => s.replace(":",""))

/* Write a udf function add_one_minute_udf that adds one minute to a time. For example:
930 becomes 931 (add 1)
959 becomes 1000 (add 41)

*/
def add_one_minute_udf = udf((d: String) => {
    val value = d.slice(d.length-2,d.length).trim.toInt
    if (value < 59) d.trim.toInt + 1
    else d.trim.toInt+41
})

/*

Read the static file into a dataframe and do calculations

1. timestamp column contains date and time. Drop the date (ex: "11/4/23 9:30" becomes "9:30")
2. using the previously written udf, remove the colon ("9:30" becomes "930")
3. create a new column "next_timestamp" using the add_one_minute_udf on timestamp
4. calculate r_close. 
   r_close = (close - open)/(high - low)
   r_close indicates how the stock traded. If the close was the high and the open the low, the value is 1 
   (i.e., the stock rose steadily over the minute). If the close is the low and the open the hight, the value is -1
   (i.e., the stock tanked steadily over the minute)
      (IMPORTANT NOTE: If the high==low, the division returns null (not an exception but the identifier
   null. You should check if the high==low and return 0 in that case))
5. lag the price by 1 minute. 
    See: https://spark.apache.org/docs/3.5.0/api/java/org/apache/spark/sql/functions.html#lag-org.apache.spark.sql.Column-int-
    (use lag(Column e,int offset))
    Use windowSpec1 for this)
    Call this column p_close (or previous_close)
6. Calculate, pct_change, the one minute percent change (close/p_close - 1) 
7. Calculate ma3, the 3 minute moving average of pct_change 
    See the example
8. Calculate ma6, the 6 minute moving average of pct_change
9. select the columns next_timestamp, timestamp, r_close, ma3, ma6 and close
    We'll use this for the join with the streaming dataframe
    Name the resulting dataframe hist_data

*/
    
val hist_data = spark.read
                    .option("inferSchema",true)
                    .option("header",true)
                    .csv("AAPL29.csv")
                    .as[HistData] //Uses the case class to construct the schema
                    .select(substring(col("timestamp"),-8,5).as("timestamp"),$"open",$"high",$"low",$"close")
                    .withColumn("timestamp",remove_colon_udf($"timestamp"))
                    .withColumn("next_timestamp",add_one_minute_udf($"timestamp"))
                    .withColumn("r_close",
                                (when(col("high")===col("low"),0))
                                .otherwise((col("close")-col("open"))/
                                          (col("high")-col("low"))))
                    .withColumn("p_close", lag("close",1,0).over(windowSpec1))

                    .withColumn("pct_change",col("close")/col("p_close")-1)
                    .withColumn("ma3",avg("pct_change").over( windowSpec3))
                    .withColumn("ma6",avg("pct_change").over( windowSpec6))
                    .select("next_timestamp","timestamp","r_close","ma3","ma6","close")

/* The stream
    * Streaming data is in the file stream_file
    * Note that this is not kosher because the timestamps in the stream_file are also in hist_data
    * In practice, they won't be (the future) but the assignment then becomes very complicated! 

1. Create a schema for the streaming data

*/

val liveStreamSchema = (ScalaReflection
                         .schemaFor[HistData]
                         .dataType
                         .asInstanceOf[StructType])

/*
2. Manipulate the timestamp to change it to "0930" rather than 10/29/2024 9:30
3. Rename the timestamp column (so that it won't clash with the timestamp column in hist_data)
4. Rename the close column 


*/
val liveDataStream = (spark.readStream
                        .schema(liveStreamSchema)
                        .option("header",false))
                        .option("maxFilePerTrigger",1)
                        .csv("liveStream")
                        .select(substring(col("timestamp"),0,10).as("date"),
                                substring(col("timestamp"),-8,5).as("timestamp"),
                                $"close")
                        .withColumn("timestamp",remove_colon_udf($"timestamp"))
                        .withColumnRenamed("close","new_close")
                        .withColumnRenamed("timestamp","time")

/* Join hist_data and liveDataStream and apply the trading rule (this is the query)

4. Join hist_data and liveDataStream using next_timestamp from hist_data and the renamed timestamp from
liveDataStream (this will join time t from hist_data with time t+1 from the stream)
5. Apply the rules:
    * ma3 > m6 (both are from hist_data)
    * close <= new_close (close from hist_data, new_close from liveDataStream)
    * r_close > 0.9 (from hist_data)
6. Select the stream timestamp, new close, ma3, ma6 and r_close
7. write the stream to the console

*/


val liveStreamFactors = liveDataStream
                       .join(hist_data,hist_data("next_timestamp")===liveDataStream("time"))
                        .filter("""ma3 > ma6""")
                        .filter("""close <= new_close""")
                        .filter("""r_close>.9""")
                        .select("date","time","new_close","ma3","ma6","r_close","close")
                        .writeStream
    .outputMode("append")
    .format("console")
    .start
    
    


Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.7.189:4041
SparkContext available as 'sc' (version = 3.5.2, master = local[*], app id = local-1732312156598)
SparkSession available as 'spark'


import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions.udf
windowSpec1: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@6726acb0
windowSpec3: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@5dc3361a
windowSpec6: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@747bc527
defined class HistData
remove_colon_udf: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$2383/0x0000000800e87840@21c827cd,StringType,List(Some(class[value[0]: string])),Some(class[value[0]: string]),None,true,true)
add_one_minute_udf: org....


-------------------------------------------
Batch: 0
-------------------------------------------
+----------+----+---------+--------------------+--------------------+-------+------+
|      date|time|new_close|                 ma3|                 ma6|r_close| close|
+----------+----+---------+--------------------+--------------------+-------+------+
|2024-10-29|1636|    232.5|1.388339337567619...|-3.57633209703833...|    1.0| 232.5|
|2024-10-29|1655|   232.46|2.585376759612900...|7.295182104053814E-6|    1.0|232.29|
|2024-10-29|1657|   232.56|5.021803641969876E-4|3.301205429367034E-4|    1.0|232.56|
|2024-10-29|1706|    232.7|1.433648594277118...| 6.87138792207868E-5|    1.0| 232.6|
|2024-10-29|1724|   232.96|2.013605464328272E-4|7.594917581880505E-8|    1.0|232.89|
+----------+----+---------+--------------------+--------------------+-------+------+



In [None]:
liveStreamFactors.stop()