In [79]:
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, FloatType, MapType, TimestampType
from pyspark.sql import functions as F, SparkSession, types as T
from pyspark.sql.functions import col, lit, udf, from_json, year, month, expr
from pyspark.sql.window import Window
import requests
import json
from datetime import datetime
import os
import ast
import pandas as pd
from sqlalchemy import create_engine, text

import findspark
findspark.init()

## Setting up Enviornment Variable and necessary connections

In [80]:
# Set environment variables
os.environ["SPARK_HOME"] = "C:\\Spark\\spark-3.5.3-bin-hadoop3"
os.environ["HADOOP_HOME"] = "D:\\hadoop3"
os.environ["PATH"] += os.pathsep + os.path.join(os.environ["HADOOP_HOME"], "bin")


# Create SparkSession
mysql_driver_path = "C:\\mysql-connector-j-9.1.0\\mysql-connector-j-9.1.0.jar"
spark = SparkSession.builder \
    .appName("cloudProject") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "10") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.jars", mysql_driver_path) \
    .getOrCreate()

## DB url
url = "jdbc:mysql://localhost:3306/dummy"

## DB connection String
path = 'db_config.json'
with open(path, 'r') as f:
    db_config = json.load(f)
    
db_cnx = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['ip']}/{db_config['db_name']}", 
                       pool_recycle=3600)

## ETL

In [59]:
def fetch_stock_data(source_name, api_key):
    url = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={source_name}&outputsize=full&apikey={api_key}'
    response = requests.get(url)
    data = response.json()
    return data

In [60]:
def spark_get_inc_df(spark, url, stock_data_df, source_name, db_config):
    qry = f"SELECT last_refresh_dt FROM source_audit WHERE source_name = '{source_name}'"
    
    last_refresh_df = spark.read \
        .format("jdbc") \
        .option("url", url) \
        .option("user", db_config['user']) \
        .option("password", db_config['password']) \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("query", qry) \
        .load()
    
    last_refresh_dt = last_refresh_df.collect()[0]['last_refresh_dt']
    last_refresh_dt = datetime.strptime(last_refresh_dt, '%Y-%m-%d').date()
    filtered_df = stock_data_df[stock_data_df['trade_dt'] >= last_refresh_dt]
    
    return filtered_df

In [61]:
def fn_insert_stock_date(df, source_name, db_cnx):
    df['stock_info'] = df['stock_info'].apply(json.dumps)
    
    source_name = source_name.lower()
    df.to_sql(f'st_stock_{source_name}_info', db_cnx, if_exists='replace', index=False)
    
    db_procedure_name = f'sp_stock_{source_name}'
    with db_cnx.connect() as connection:
        connection.execute(text(f'CALL {db_procedure_name}()'))

In [62]:
# def update_source_audit(spark, url, db_config, source_name, last_refresh_dt, last_exec_dt, tot_exec_time, db_cnx):
#     source_name = source_name.upper()
# #     print(source_name)

#     source_audit_df = spark.read \
#         .format("jdbc") \
#         .option("url", url) \
#         .option("dbtable", "source_audit") \
#         .option("user", db_config['user']) \
#         .option("password", db_config['password']) \
#         .option("driver", "com.mysql.cj.jdbc.Driver") \
#         .load()
    
#     updated_df = source_audit_df.filter(source_audit_df['source_name'] == source_name)

#     ## altering only specific row , according to condition
#     updated_df = source_audit_df.withColumn(
#         'last_refresh_dt', 
#         F.when(source_audit_df['source_name'] == source_name, F.lit(last_refresh_dt))
#         .otherwise(source_audit_df['last_refresh_dt'])
#     ).withColumn(
#         'last_exec_dt',
#         F.when(source_audit_df['source_name'] == source_name, F.lit(last_exec_dt))
#         .otherwise(source_audit_df['last_exec_dt'])
#     ).withColumn(
#         'tot_exec_time',
#         F.when(source_audit_df['source_name'] == source_name, F.lit(tot_exec_time))
#         .otherwise(source_audit_df['tot_exec_time'])
#     )

    
#     updated_df.write \
#         .format("jdbc") \
#         .option("url", url) \
#         .option("dbtable", "source_audit") \
#         .option("user", db_config['user']) \
#         .option("password", db_config['password']) \
#         .option("driver", "com.mysql.cj.jdbc.Driver") \
#         .mode("overwrite") \
#         .save()
    
#     ## This solution is not the most optimal .Pyspark does not natively support Update opertiaon,
#     # so the typical approach involves overwriting the entire table or partition


def fn_update_source_audit(source_name, last_refresh_dt, last_exec_dt, tot_exec_time, db_cnx):
    qry = text("""
        UPDATE source_audit
        SET 
            last_refresh_dt = :last_refresh_dt,
            last_exec_dt = :last_exec_dt,
            tot_exec_time = :tot_exec_time
        WHERE source_name = :source_name
    """)
    
    # Execute the query with parameters as a dictionary
    with db_cnx.connect() as connection:
        connection.execute(qry, {
            'last_refresh_dt': last_refresh_dt,
            'last_exec_dt': last_exec_dt,
            'tot_exec_time': tot_exec_time,
            'source_name': source_name
        })
        connection.commit()

In [63]:
def fn_insert_task_audit(source_name,last_exec_dt,exec_time, db_cnx):
    source_name = source_name.upper()
    task_dict = {
        'task_name' : {source_name},
        'last_exec_dt' : {last_exec_dt},
        'flg_success' : 1,
        'exec_time' : {exec_time}
    }
    df = pd.DataFrame([task_dict])
    df.to_sql('task_audit', db_cnx, if_exists='append', index=False)
    
def fn_insert_source_info(source_name, last_refresh_dt, time_zone, info):
    source_name = source_name.upper()
    source_dict = {
        'source_name': [source_name],  
        'last_refresh_dt': [last_refresh_dt],  
        'time_zone': ['US/Eastern'],  
        'info': [info] 
    }

    df = pd.DataFrame(source_dict)

    df.to_sql('source_info', db_cnx, if_exists='append', index=False)

In [64]:
def fn_fetch_data(source_name, spark, db_config, url, db_cnx):
    start_time = datetime.now()
    
    ## Never share this key
    dcu_mail_api_key = '410VUNUTN1PD45LW'
    #personal_mail_api_key = 'J9M3DNRBBYIK9GXN'

    data = fetch_stock_data(source_name, dcu_mail_api_key)
    
    info_df = pd.DataFrame(data).reset_index()
    last_refresh_dt = info_df.loc[info_df['index'].str.contains('last refreshed', case=False, na=False), 'Meta Data'].iloc[0]
    info = info_df.loc[info_df['index'].str.contains('information', case=False, na=False), 'Meta Data'].iloc[0]
    time_zone = info_df.loc[info_df['index'].str.contains('time zone', case=False, na=False), 'Meta Data'].iloc[0]

    last_exec_dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    end_time = datetime.now()
    tot_exec_time = int((end_time - start_time).total_seconds())
    
    
    stock_data_df = info_df[~info_df['Time Series (Daily)'].isna()][['index', 'Time Series (Daily)']]
    stock_data_df.columns = ['trade_dt', 'stock_info']

    stock_data_df['trade_dt'] = pd.to_datetime(stock_data_df['trade_dt']).dt.date
    stock_data_df['stock_info'] = stock_data_df['stock_info'].apply(json.dumps)
    
    
    ## TO get incremental data
    stock_data_df =  spark_get_inc_df(spark, url, stock_data_df, source_name, db_config)

    ## To load the data into table
    fn_insert_stock_date(stock_data_df, source_name, db_cnx)
    
    ## Update Souce Audit table
    #update_source_audit(spark, url, db_config, source_name, last_refresh_dt, last_exec_dt, tot_exec_time, db_cnx)
    fn_update_source_audit(source_name, last_refresh_dt, last_exec_dt, tot_exec_time, db_cnx)
    
    ## Insert data into task audit
    fn_insert_task_audit(source_name,last_exec_dt,tot_exec_time, db_cnx)
    
    ## Insert meta information into source_info table
    fn_insert_source_info(source_name, last_refresh_dt, time_zone, info)

In [65]:
source_list = ['TSLA', 'AMZN', 'IBM', 'MSFT', 'AAPL']    
for source_name in source_list:
    print(source_name)
    fn_fetch_data(source_name, spark, db_config, url, db_cnx)

TSLA
AMZN
IBM
MSFT
AAPL


## Transformations

In [94]:
def fn_load_data(table_name, spark, db_config, url):
    # url = "jdbc:mysql://localhost:3306/dummy"
    
    qry = f"""SELECT * 
    FROM stock_{table_name}_info
    order by trade_dt desc"""
    
    df = spark.read \
        .format("jdbc") \
        .option("url", url) \
        .option("user", db_config['user']) \
        .option("password", db_config['password']) \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("query", qry) \
        .load()
    
    return df

In [95]:
def fn_strc_change(df, table_name):
    df = df.withColumn("parsed_stock_info", F.from_json(F.col("stock_info"), json_schema))

    df_transformed = df.select(
        F.col("trade_dt"),
        F.col("parsed_stock_info.`1. open`").alias("open"),
        F.col("parsed_stock_info.`2. high`").alias("high"),
        F.col("parsed_stock_info.`3. low`").alias("low"),
        F.col("parsed_stock_info.`4. close`").alias("close"),
        F.col("parsed_stock_info.`5. volume`").alias("volume")
    )
    df_transformed = df_transformed.withColumn("company", F.lit(table_name))
    
    return df_transformed

In [96]:
def fn_data_type_casting(df):
    # Convert numeric fields to proper types
    df = df.withColumn("open", F.col("open").cast("double")) \
           .withColumn("high", F.col("high").cast("double")) \
           .withColumn("low", F.col("low").cast("double")) \
           .withColumn("close", F.col("close").cast("double")) \
           .withColumn("volume", F.col("volume").cast("long"))
    return df

In [97]:
json_schema = StructType([
    StructField("1. open", StringType(), True),
    StructField("2. high", StringType(), True),
    StructField("3. low", StringType(), True),
    StructField("4. close", StringType(), True),
    StructField("5. volume", StringType(), True)
])

In [98]:
def process_and_concat_tables(tables, spark, db_config ,url):
    combined_df = None
    
    for table in tables:
        try:
            df = fn_load_data(table, spark, db_config, url)
            
            # Change structure
            df = fn_strc_change(df, table)
            
            # Transform data
            df = fn_data_type_casting(df)
            
            ## Data append on axis 0
            if combined_df is None:
                combined_df = df
            else:
                combined_df = combined_df.unionByName(df)
            
            print(f"Processing completed for table -- stock_{table}_info")
        except Exception as e:
            print(f"Error processing table stock_{table}_info: {e}")
            
    print('\n\n')
    
    return combined_df

In [99]:
tables = ['tsla', 'amzn', 'ibm', 'msft', 'aapl']
final_df = process_and_concat_tables(tables, spark, db_config, url)

#Cache final_df after it's computed and before running any downstream operations:
final_df.cache()
final_df.show()

Processing completed for table -- stock_tsla_info
Processing completed for table -- stock_amzn_info
Processing completed for table -- stock_ibm_info
Processing completed for table -- stock_msft_info
Processing completed for table -- stock_aapl_info



+----------+-------+--------+--------+------+---------+-------+
|  trade_dt|   open|    high|     low| close|   volume|company|
+----------+-------+--------+--------+------+---------+-------+
|2024-11-26|  341.0|  346.96|  335.66|338.23| 61310388|   tsla|
|2024-11-25| 360.14|  361.93|   338.2|338.59| 95890899|   tsla|
|2024-11-22|341.085|  361.53|   337.7|352.56| 89140722|   tsla|
|2024-11-21| 343.81|347.9899|  335.28|339.64| 57686586|   tsla|
|2024-11-20|  345.0|346.5999|   334.3|342.03| 66340650|   tsla|
|2024-11-19| 335.76|347.3799|  332.75| 346.0| 88852452|   tsla|
|2024-11-18| 340.73|348.5499|  330.01|338.74|126547455|   tsla|
|2024-11-15| 310.57|324.6799|  309.22|320.72|114440286|   tsla|
|2024-11-14| 327.69|  329.98|  310.37|311.18

In [100]:
distinct_companies = final_df.select("company").distinct()

distinct_companies.show()

+-------+
|company|
+-------+
|   tsla|
|   amzn|
|    ibm|
|   msft|
|   aapl|
+-------+



In [101]:
def fn_data_transformation(df):
    df = df.withColumn("year_id", year("trade_dt"))  
    df = df.withColumn("mth_id", expr("year(trade_dt) * 100 + month(trade_dt)"))
    
    df = df.filter(df['mth_id'] >= 201001)
    
    ## Changing Column order
    col_order = ['year_id','mth_id', 'company'] + df.columns[:-3]
    df = df.select(*col_order) 
    return df

In [102]:
final_df = fn_data_transformation(final_df)
final_df.show()

+-------+------+-------+----------+-------+--------+--------+------+---------+
|year_id|mth_id|company|  trade_dt|   open|    high|     low| close|   volume|
+-------+------+-------+----------+-------+--------+--------+------+---------+
|   2024|202411|   tsla|2024-11-26|  341.0|  346.96|  335.66|338.23| 61310388|
|   2024|202411|   tsla|2024-11-25| 360.14|  361.93|   338.2|338.59| 95890899|
|   2024|202411|   tsla|2024-11-22|341.085|  361.53|   337.7|352.56| 89140722|
|   2024|202411|   tsla|2024-11-21| 343.81|347.9899|  335.28|339.64| 57686586|
|   2024|202411|   tsla|2024-11-20|  345.0|346.5999|   334.3|342.03| 66340650|
|   2024|202411|   tsla|2024-11-19| 335.76|347.3799|  332.75| 346.0| 88852452|
|   2024|202411|   tsla|2024-11-18| 340.73|348.5499|  330.01|338.74|126547455|
|   2024|202411|   tsla|2024-11-15| 310.57|324.6799|  309.22|320.72|114440286|
|   2024|202411|   tsla|2024-11-14| 327.69|  329.98|  310.37|311.18|120726109|
|   2024|202411|   tsla|2024-11-13| 335.85|344.5999|

In [103]:
grp_mth_df = final_df.groupBy("company","mth_id").agg(
    F.first("open").alias('open'),
    F.last("close").alias('close'),
    F.max("high").alias('high'),
    F.min("low").alias('low'),
    F.sum("volume").alias('tot_volume')
)

grp_mth_df = grp_mth_df.orderBy(["company", "mth_id"], ascending = True)
grp_mth_df.show()

+-------+------+--------+------+-------+--------+----------+
|company|mth_id|    open| close|   high|     low|tot_volume|
+-------+------+--------+------+-------+--------+----------+
|   aapl|201001|  201.08|214.01| 215.59|  190.25| 541749800|
|   aapl|201002|  202.38|194.73| 205.17|  190.85| 384860000|
|   aapl|201003|  235.49|208.99| 237.48|  205.45| 434077600|
|   aapl|201004|  269.31|235.97| 272.46|  232.75| 441683200|
|   aapl|201005|259.3881|266.35| 267.88|  199.25| 645809100|
|   aapl|201006|  256.71|260.83| 279.01|   242.2| 594687600|
|   aapl|201007|255.8925|248.48| 265.99|   239.6| 559632300|
|   aapl|201008|  241.85|261.85| 264.28|  235.56| 342468600|
|   aapl|201009|   289.0|250.33| 294.73|  246.28| 423211400|
|   aapl|201010|  304.23|282.52|  319.0|  277.77| 436949200|
|   aapl|201011|  313.54|304.18|  321.3|  297.76| 339574500|
|   aapl|201012|  322.95| 316.4| 326.66|  314.89| 249044100|
|   aapl|201101|   335.8|329.57|  348.6|324.8365| 387197700|
|   aapl|201102|  351.24

In [104]:
window_spec = Window.partitionBy("company").orderBy("mth_id")


grp_mth_df = grp_mth_df.withColumn(
    "normalized_close", 
    F.round((F.col("close") / F.first("close").over(window_spec)) * 100, 2)
)

# Operation 2: Calculate percentage change in 'close_price' for each 'company'
grp_mth_df = grp_mth_df.withColumn(
    "pct_change", 
    F.round((F.col("close") - F.lag("close").over(window_spec)) / F.lag("close").over(window_spec) * 100, 2)
)

# Show the result
grp_mth_df.show()

+-------+------+--------+------+-------+--------+----------+----------------+----------+
|company|mth_id|    open| close|   high|     low|tot_volume|normalized_close|pct_change|
+-------+------+--------+------+-------+--------+----------+----------------+----------+
|   aapl|201001|  201.08|214.01| 215.59|  190.25| 541749800|           100.0|      NULL|
|   aapl|201002|  202.38|194.73| 205.17|  190.85| 384860000|           90.99|     -9.01|
|   aapl|201003|  235.49|208.99| 237.48|  205.45| 434077600|           97.65|      7.32|
|   aapl|201004|  269.31|235.97| 272.46|  232.75| 441683200|          110.26|     12.91|
|   aapl|201005|259.3881|266.35| 267.88|  199.25| 645809100|          124.46|     12.87|
|   aapl|201006|  256.71|260.83| 279.01|   242.2| 594687600|          121.88|     -2.07|
|   aapl|201007|255.8925|248.48| 265.99|   239.6| 559632300|          116.11|     -4.73|
|   aapl|201008|  241.85|261.85| 264.28|  235.56| 342468600|          122.35|      5.38|
|   aapl|201009|   28

In [105]:
grp_year_df = final_df.groupBy("company","year_id").agg(
    F.first("open").alias('open'),
    F.last("close").alias('close'),
    F.max("high").alias('high'),
    F.min("low").alias('low'),
    F.sum("volume").alias('tot_volume')
)

grp_year_df = grp_year_df.orderBy(["company","year_id"], ascending = True)
grp_year_df.show()

+-------+-------+-------+------+--------+-------+-----------+
|company|year_id|   open| close|    high|    low| tot_volume|
+-------+-------+-------+------+--------+-------+-----------+
|   aapl|   2010| 322.95|214.01|  326.66| 190.25| 5393747400|
|   aapl|   2011| 403.51|329.57|   426.7|  310.5| 4430690700|
|   aapl|   2012| 510.53|411.23|  705.07|  409.0| 4713007300|
|   aapl|   2013| 554.17|549.03|575.1358|  385.1| 3657913200|
|   aapl|   2014| 112.82|553.13|  651.26|  89.65| 8734011985|
|   aapl|   2015| 107.01|109.33|  134.54|   92.0|13064316775|
|   aapl|   2016| 116.65|105.35|  118.69|  89.47| 9685871785|
|   aapl|   2017| 170.52|116.15|   177.2| 114.76| 6687212823|
|   aapl|   2018| 158.53|172.26|  233.47| 146.59| 8467093298|
|   aapl|   2019| 289.93|157.92|  293.97|  142.0| 7086568153|
|   aapl|   2020| 134.08|300.35|  515.14|  103.1|18552582945|
|   aapl|   2021|178.085|129.41|  182.13| 116.21|22798348120|
|   aapl|   2022| 128.41|182.01|  182.94| 125.87|22050192133|
|   aapl

In [106]:
window_spec = Window.partitionBy("company").orderBy("year_id")


grp_year_df = grp_year_df.withColumn(
    "normalized_close", 
    F.round((F.col("close") / F.first("close").over(window_spec)) * 100, 2)
)

#Calculate pct_change(%) in 'close' for each 'company'
grp_year_df = grp_year_df.withColumn(
    "pct_change", 
    F.round((F.col("close") - F.lag("close").over(window_spec)) / F.lag("close").over(window_spec) * 100, 2)
)

grp_year_df.show()

+-------+-------+-------+------+--------+-------+-----------+----------------+----------+
|company|year_id|   open| close|    high|    low| tot_volume|normalized_close|pct_change|
+-------+-------+-------+------+--------+-------+-----------+----------------+----------+
|   aapl|   2010| 322.95|214.01|  326.66| 190.25| 5393747400|           100.0|      NULL|
|   aapl|   2011| 403.51|329.57|   426.7|  310.5| 4430690700|           154.0|      54.0|
|   aapl|   2012| 510.53|411.23|  705.07|  409.0| 4713007300|          192.15|     24.78|
|   aapl|   2013| 554.17|549.03|575.1358|  385.1| 3657913200|          256.54|     33.51|
|   aapl|   2014| 112.82|553.13|  651.26|  89.65| 8734011985|          258.46|      0.75|
|   aapl|   2015| 107.01|109.33|  134.54|   92.0|13064316775|           51.09|    -80.23|
|   aapl|   2016| 116.65|105.35|  118.69|  89.47| 9685871785|           49.23|     -3.64|
|   aapl|   2017| 170.52|116.15|   177.2| 114.76| 6687212823|           54.27|     10.25|
|   aapl| 