In [None]:
import yaml
import os

import pandas as pd
import numpy as np
import tensorflow as tf

from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType
from pyspark.sql.functions import col, to_date, to_timestamp
from pyspark.errors import PySparkException
from pymongo import MongoClient
from pymongo import errors

In [None]:
def gen_mongo_uri(mongo_conf) -> str | None:
    """
    generate mongo connection uri based on input
    :return:
    """
    if not mongo_conf:
        return None
    return (f"mongodb+srv://{mongo_conf['user']}:"
            f"{mongo_conf['token']}@"
            f"{mongo_conf['host']}"
            f"/?retryWrites=true&w=majority")

def get_property(conf) -> dict:
    """
    get property for database
    """
    return {key: conf[key] for key in conf.keys()
                         & {'user', 'password', 'driver'}}

def gen_maria_jdbc(conf, db: str = '') -> str | None:
    """
    get connection jdbc string for maria database
    """
    if not conf:
        return None

    db = conf['database'] if not db else db
    return (f"jdbc:mysql://{conf['host']}:"
            f"{conf['port']}/"
            f"{db}?permitMysqlScheme")

def gen_postgres_jdbc(conf, db: str = '') -> str | None:
    """
    get connection jdbc string for maria database
    """
    if not conf:
        return None
    db = conf['database'] if not db else db
    return (f"jdbc:postgresql://{conf['host']}:"
            f"{conf['port']}/{db}")

def gen_mssql_jdbc(conf, db: str = '') -> str | None:
    """
    get connection jdbc string for maria database
    """
    if not conf:
        return None
    db = conf['database'] if not db else db
    return (f"jdbc:sqlserver://{conf['host']}:{conf['port']};"
            f"databaseName={db};encrypt=true;trustServerCertificate=true;")

def gen_azure_jdbc(conf, db: str = '') -> str | None:
    """
    get connection jdbc string for azure sql database
    """
    if not conf:
        return None
    db = conf['database'] if not db else db
    return (f"jdbc:sqlserver://{conf['host']}:{conf['port']};"
            f"databaseName={db};encrypt=true;")


def init_mongodb_client(uri: str) -> MongoClient | None:
    """
    initialize the mongo client
    """
    try:
        # Initialize new MongoDB client
        client = MongoClient(uri)
    except errors.ConnectionFailure as e:
        # Handle connection failure gracefully
        print(f"Failed to connect to MongoDB: {e}")

        return None
    else:
        return client

def prepare_dataframe(spark, sc, data) -> DataFrame:
    """
    prepare dataframe when the data might be in different data types
    :param spark: spark session
    :param sc: spark context
    :param data: data to be processed
    :return: spark sql dataframe
    """
    if isinstance(data, pd.DataFrame) and not data.empty:
        print('data is pandas df and not empty')
        df = spark.createDataFrame(data)
    elif isinstance(data, DataFrame) and not data.isEmpty():
        print('data is spark df and not empty')
        df = data
    elif isinstance(data, list):
        print('data is list and not empty')
        df = spark.read.json(sc.parallelize([json.dumps(record) for record in data]))
    elif isinstance(data, dict):
        print('data is dictionary and not empty')
        data = [data]
        df = spark.read.json(sc.parallelize([json.dumps(record) for record in data]))
    else:
        
        print('data empty')
        # initialize empty dataframe
        schema = StructType([])
        df = spark.createDataFrame([], schema)

    return df
    
def write_to_mongo(spark, data, 
                   uri: str, db: str, col: str) -> None:
    """
    write data into mongo db
    :param spark: sparkSession
    :param data: 
    :param uri: str, 
    :param db: str, 
    :param col: str, 
    :return:
    """
    sc = spark.sparkContext
    df = prepare_dataframe(spark, sc, data)
    config = {
        'uri': uri,
        'database': db,
        'collection': col
    }
    if not df.isEmpty():
        print('------------------writing data to mongodb----------------------')
        df.write.format("mongo") \
            .options(**config) \
            .mode("append") \
            .save()
        print('------------------writing data to mongodb complete----------------------')

def write_to_database(spark, data, conf,
                   db: str, write_table: str, type: str = 'mariadb') -> None:
    """
    write data into database table
    :param data:
    :param uri:
    :param db:
    :param table:
    :param type:
    :return:
    """
    sc = spark.sparkContext
    df = prepare_dataframe(spark, sc, data)
    properties = get_property(conf)

    jdbc = ''
    if type == 'mariadb':
        jdbc = gen_maria_jdbc(conf, db)
    elif type == 'postgres':
        jdbc = gen_postgres_jdbc(conf, db)
    elif type == 'mssql':
        jdbc = gen_maria_jdbc(conf, db)
    else:
        raise ValueError('error when generating jdbc')

    if not df.isEmpty():
        print(f'------------------writing data to {type}----------------------')
        df.write.jdbc(
            url=jdbc,
            table=write_table,
            mode="append",
            properties=properties
        )
        print(f'------------------writing data to {type} complete----------------------')
    else:
        print('empty dataset')

def sentiment_analysis(data, tokenizer, model) -> pd.DataFrame:
    """
    sentiment analysis and retrun dataframe with score
    """
    data['sentiment'] = None
    print('------------------predicting sentiments----------------------')
    for idx, text in enumerate(data['text']):
        tokenized_news = tokenizer(text, truncation=True, return_tensors="tf")
        logits = model.predict(tokenized_news).logits
        probabilities = tf.nn.softmax(logits)
        data.at[idx, 'sentiment'] = probabilities.numpy()
    print('------------------predicting sentiments complete----------------------')
    return data

In [None]:
with open('conf.yaml', 'r') as file:
        config = yaml.safe_load(file)

mongo_uri = gen_mongo_uri(config['mongodb'])

In [None]:
try:
    # getting the spark instance
    spark = SparkSession.builder \
        .appName('Big Data Project ETL') \
        .config("spark.driver.bindAddress", "0.0.0.0") \
        .config("spark.mongodb.input.uri", mongo_uri) \
        .config("spark.mongodb.output.uri", mongo_uri) \
        .config("spark.jars.packages", "com.microsoft.azure:spark-mssql-connector_2.12:1.2.0,"
                                       "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
        .getOrCreate()
except PySparkException as e:
    print(f"Failed to get or create Spark: {e}")
except Exception as e:
    print(f'Exception Caught: {e}')

In [None]:
stock_symbols = [
        'AAPL', 'MSFT', 'NVDA', 'META', 'AMZN', 'TSLA', 'GOOGL',
        'ON', 'DBD', 'DSGX', 'GTLB', 'LOGI', 'CRSR',
        'LNG', 'SWN', 'APA', 'BTU', 'CL',
        'BMY', 'THC', 'TNDM',
        'MOS', 'AXTA', 'KOP',
        'SBLK', 'EME', 'DNOW',
    ]


In [None]:
# load historical data from database
maria_hist_data_df = spark.read.jdbc(
    url=gen_maria_jdbc(config['mariadb']),
    table='yf_historical_data',
    properties=get_property(config['mariadb'])
)

In [None]:
# load sentiment data from csv
news_sp_df = spark.read.csv(os.path.join('data/news_sentiment_all.csv'))

In [None]:
# join news_sp_df with historical data
joined_df = maria_hist_data_df.join(news_sp_df, on=['symbol', 'date'], how='inner')

In [None]:
# write result data into mariadb
write_to_database(spark, joined_df, config['mariadb'],
                   db='finance_out', write_table='historical_with_sentiment', type = 'mariadb')

In [None]:
spark.stop()