## Import the libraries

In [6]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import numpy as np 
from pyspark.sql.types import *

StatementMeta(spark1, 11, 2, Finished, Available)

## Read the metadata

In [7]:
meta = spark.read.option("header","true").csv("abfss://royadlsfs@royadlsgen2.dfs.core.windows.net/symbols_valid_meta.csv")
display(meta.limit(3))
# meta.limit(3).toPandas()

StatementMeta(spark1, 11, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, 854d3de6-e82c-4f9f-b762-6f81943542d0)

## Read the stocks data

In [8]:
stocks = spark.read.option("header","true").csv("abfss://royadlsfs@royadlsgen2.dfs.core.windows.net/stocks/")
display(stocks.limit(3))
# stocks.limit(3).toPandas()

StatementMeta(spark1, 11, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, 0ea8f9c9-d8df-4bd9-9145-f0854277f1ec)

### get the input file names

In [9]:
stocks = stocks.withColumn("filename", input_file_name())
stocks.show(3,truncate=False)

StatementMeta(spark1, 11, 5, Finished, Available)

+----------+-------------------+-------------------+-------------------+-------------------+--------------------+-------+-----------------------------------------------------------------+
|Date      |Open               |High               |Low                |Close              |Adj Close           |Volume |filename                                                         |
+----------+-------------------+-------------------+-------------------+-------------------+--------------------+-------+-----------------------------------------------------------------+
|1962-01-02|0.13127270340919495|0.13127270340919495|0.12417688220739365|0.12417688220739365|0.00688728503882885 |2480300|abfss://royadlsfs@royadlsgen2.dfs.core.windows.net/stocks/HPQ.csv|
|1962-01-03|0.12417688220739365|0.12417688220739365|0.12151595205068588|0.12284641712903976|0.006813489831984043|507300 |abfss://royadlsfs@royadlsgen2.dfs.core.windows.net/stocks/HPQ.csv|
|1962-01-04|0.12284641712903976|0.126837819814682  |0.117968

In [10]:
stocks = stocks.withColumn("filename",split(col("filename"), "/").getItem(4))
stocks = stocks.withColumn("Symbol",split(col("filename"), '\.').getItem(0))
stocks = stocks.drop("filename")
display(stocks.limit(2))
# stocks.limit(2).toPandas()

StatementMeta(spark1, 11, 6, Finished, Available)

SynapseWidget(Synapse.DataFrame, 442e1b2b-7484-48ff-a257-8a87d33f5420)

## Read the ETFs data

In [11]:
etfs = spark.read.option("header","true").csv("abfss://royadlsfs@royadlsgen2.dfs.core.windows.net/etfs/")
display(etfs.limit(3))
# etfs.limit(3).toPandas()

StatementMeta(spark1, 11, 7, Finished, Available)

SynapseWidget(Synapse.DataFrame, 55a508da-1433-4a88-880c-e3d53dbba720)

### get the input file names

In [12]:
etfs = etfs.withColumn("filename", input_file_name())
etfs.show(3,truncate=False)

StatementMeta(spark1, 11, 8, Finished, Available)

+----------+----+-----+------+-----+-----------------+------+---------------------------------------------------------------+
|Date      |Open|High |Low   |Close|Adj Close        |Volume|filename                                                       |
+----------+----+-----+------+-----+-----------------+------+---------------------------------------------------------------+
|1986-04-03|0.0 |4.75 |4.625 |4.625|4.449552059173584|15300 |abfss://royadlsfs@royadlsgen2.dfs.core.windows.net/etfs/CEF.csv|
|1986-04-04|0.0 |4.75 |4.6875|4.75 |4.56981086730957 |12000 |abfss://royadlsfs@royadlsgen2.dfs.core.windows.net/etfs/CEF.csv|
|1986-04-07|0.0 |4.875|4.75  |4.75 |4.56981086730957 |11500 |abfss://royadlsfs@royadlsgen2.dfs.core.windows.net/etfs/CEF.csv|
+----------+----+-----+------+-----+-----------------+------+---------------------------------------------------------------+
only showing top 3 rows



In [13]:
etfs = etfs.withColumn("filename",split(col("filename"), "/").getItem(4))
etfs = etfs.withColumn("Symbol",split(col("filename"), '\.').getItem(0))
etfs = etfs.drop("filename")
display(etfs.limit(2))
# etfs.limit(2).toPandas()

StatementMeta(spark1, 11, 9, Finished, Available)

SynapseWidget(Synapse.DataFrame, 4fb93d27-a1b4-4a31-b2d2-b19786e55c05)

## Bring the data together and save to parquet

In [14]:
df = stocks.unionAll(etfs).dropDuplicates()

StatementMeta(spark1, 11, 10, Finished, Available)

In [10]:
# print(stocks.count())
# print(etfs.count())
# print(df.count())

StatementMeta(fc2621f8-0e70-4acd-8413-258a265242a7, 5, 15, Finished, Available)

24197442
3950926
28118125


In [15]:
c = meta.select("Symbol","Security Name").distinct()
# c.show(2)

StatementMeta(spark1, 11, 11, Finished, Available)

In [16]:
df = df.join(c,on="Symbol")

StatementMeta(spark1, 11, 12, Finished, Available)

In [17]:
df.printSchema()

StatementMeta(spark1, 11, 13, Finished, Available)

root
 |-- Symbol: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Security Name: string (nullable = true)



### Cast string to double

In [19]:
df2 = df.withColumn("Symbol",col('Symbol'))\
        .withColumn("Date",col('Date'))\
        .withColumnRenamed("Security Name",'Security_Name')\
        .withColumn("Open",col('Open').cast(DoubleType()))\
        .withColumn("High",col('High').cast(DoubleType()))\
        .withColumn("Low",col('Low').cast(DoubleType()))\
        .withColumn("Close",col('Close').cast(DoubleType()))\
        .withColumn("Adj_Close",col('Adj Close').cast(DoubleType())).drop("Adj Close")\
        .withColumn("Volume",col('Volume').cast(DoubleType()))
display(df2.limit(3))
# df2.limit(3).toPandas()

StatementMeta(spark1, 11, 15, Finished, Available)

SynapseWidget(Synapse.DataFrame, 243b31e0-a185-4180-93ef-c56b0b9ba9c4)

In [20]:
df2.write.mode("overwrite").parquet("abfss://royadlsfs@royadlsgen2.dfs.core.windows.net/landing/nasdaq.parquet")

StatementMeta(spark1, 11, 16, Finished, Available)

## Read the nasdaq.parquet and create features

In [21]:
df = spark.read.parquet("abfss://royadlsfs@royadlsgen2.dfs.core.windows.net/landing/nasdaq.parquet").orderBy("Symbol","Date")
# display(df.limit(5))
df.limit(5).toPandas()

StatementMeta(spark1, 11, 17, Finished, Available)

Unnamed: 0,Symbol,Date,Open,High,Low,Close,Volume,Security_Name,Adj_Close
0,A,1999-11-18,32.546494,35.765381,28.612303,31.473534,62546300.0,"Agilent Technologies, Inc. Common Stock",27.068665
1,A,1999-11-19,30.71352,30.758226,28.478184,28.880543,15234100.0,"Agilent Technologies, Inc. Common Stock",24.838577
2,A,1999-11-22,29.551144,31.473534,28.657009,31.473534,6577800.0,"Agilent Technologies, Inc. Common Stock",27.068665
3,A,1999-11-23,30.400572,31.205294,28.612303,28.612303,5975600.0,"Agilent Technologies, Inc. Common Stock",24.60788
4,A,1999-11-24,28.701717,29.998211,28.612303,29.372318,4843200.0,"Agilent Technologies, Inc. Common Stock",25.261524


In [22]:
df.printSchema()

StatementMeta(spark1, 11, 18, Finished, Available)

root
 |-- Symbol: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Security_Name: string (nullable = true)
 |-- Adj_Close: double (nullable = true)



In [62]:
# df = df.withColumn("Date",to_timestamp(col('Date'),'yyyy-MM-dd'))

StatementMeta(spark1, 8, 63, Finished, Available)

### Set Window config and create udf to calculate median

In [23]:
w = (Window.partitionBy("Symbol").orderBy(to_timestamp(col('Date'),'yyyy-MM-dd').cast('long')).rangeBetween(-30, 0))
median_udf = udf(lambda x: float(np.median(x)), FloatType())

StatementMeta(spark1, 11, 19, Finished, Available)

### Calculate Rolling Median

In [24]:
df = df.withColumn("list", collect_list("Volume").over(w)) \
  .withColumn("rolling_median", median_udf("list")).drop("list")
df.show(2,truncate = False)

StatementMeta(spark1, 11, 20, Finished, Available)

+------+----------+------------------+------------------+-----------------+------------------+---------+----------------------------------------+------------------+--------------+
|Symbol|Date      |Open              |High              |Low              |Close             |Volume   |Security_Name                           |Adj_Close         |rolling_median|
+------+----------+------------------+------------------+-----------------+------------------+---------+----------------------------------------+------------------+--------------+
|AAT   |2011-01-13|21.530000686645508|22.0              |21.18000030517578|21.25             |1.55369E7|American Assets Trust, Inc. Common Stock|16.332218170166016|1.55369E7     |
|AAT   |2011-01-14|21.15999984741211 |21.450000762939453|21.15999984741211|21.309999465942383|1304800.0|American Assets Trust, Inc. Common Stock|16.378332138061523|1304800.0     |
+------+----------+------------------+------------------+-----------------+------------------+------

### Calculate Rolling Mean

In [25]:
df = df.withColumn('rolling_average', avg("Volume").over(w))
df.show(2,truncate = False)

StatementMeta(spark1, 11, 21, Finished, Available)

+------+----------+------------------+------------------+-----------------+------------------+---------+----------------------------------------+------------------+--------------+---------------+
|Symbol|Date      |Open              |High              |Low              |Close             |Volume   |Security_Name                           |Adj_Close         |rolling_median|rolling_average|
+------+----------+------------------+------------------+-----------------+------------------+---------+----------------------------------------+------------------+--------------+---------------+
|AAT   |2011-01-13|21.530000686645508|22.0              |21.18000030517578|21.25             |1.55369E7|American Assets Trust, Inc. Common Stock|16.332218170166016|1.55369E7     |1.55369E7      |
|AAT   |2011-01-14|21.15999984741211 |21.450000762939453|21.15999984741211|21.309999465942383|1304800.0|American Assets Trust, Inc. Common Stock|16.378332138061523|1304800.0     |1304800.0      |
+------+----------+-

### Write to parquet in staging bucket

In [27]:
df.write.mode("overwrite").parquet("abfss://royadlsfs@royadlsgen2.dfs.core.windows.net/staging/nasdaq_fe.parquet")

StatementMeta(spark1, 11, 23, Finished, Available)

In [28]:
# df.orderBy("Symbol","Date").show(30)

StatementMeta(spark1, 11, 24, Finished, Available)