In [1]:
import os
import sys

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import (
    ArrayType,
    DateType,
    DecimalType,
    IntegerType,
    LongType,
    StringType,
    StructField,
    StructType,
)

# Add the parent directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
from utils import configutils
from utils import sparkutils

In [2]:
conf = sparkutils.get_spark_config()

In [3]:
spark: SparkSession = SparkSession.builder.config(conf=conf).getOrCreate()

In [3]:
AWS_ACCESS_KEY = configutils.read_application_config("AWS_ACCESS_KEY_ID")
AWS_SECRET_KEY = configutils.read_application_config("AWS_SECRET_ACCESS_KEY")

In [4]:
hadoopConfig = spark.sparkContext._jsc.hadoopConfiguration()
hadoopConfig.set(
    "fs.s3a.aws.credentials.provider",
    "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
)
hadoopConfig.set("fs.s3a.access.key", AWS_ACCESS_KEY)
hadoopConfig.set("fs.s3a.secret.key", AWS_SECRET_KEY)

In [5]:
kafka_df = (
    spark.read.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "stock-data")
    .option("startingOffsets", "earliest")
    .option("group.id", "stock-stream-consumer")
    .option("failOnDataLoss", False)
    .load()
)

In [None]:
kafka_df.show()

In [30]:
# kafka_df.writeStream.format("console").outputMode("append").start().awaitTermination()

In [7]:
schema = StructType(
    [
        StructField("type", StringType()),
        StructField(
            "data",
            ArrayType(
                StructType(
                    [
                        StructField("p", DecimalType()),
                        StructField("s", StringType()),
                        StructField("t", LongType()),
                        StructField("v", DecimalType()),
                        StructField("c", IntegerType()),
                    ]
                )
            ),
        ),
        StructField(
            "companyProfile",
            StructType(
                [
                    StructField("country", StringType()),
                    StructField("currency", StringType()),
                    StructField("exchange", StringType()),
                    StructField("ipo", DateType()),
                    StructField("marketCapitalization", DecimalType()),
                    StructField("name", StringType()),
                    StructField("phone", StringType()),
                    StructField("shareOutstanding", DecimalType()),
                    StructField("ticker", StringType()),
                    StructField("weburl", StringType()),
                    StructField("logo", StringType()),
                    StructField("finnhubIndustry", StringType()),
                ]
            ),
        ),
    ]
)

In [8]:
kafka_df = kafka_df.withColumn("key", f.col("key").cast(StringType())).withColumn(
    "value", f.col("value").cast(StringType())
)

In [9]:
raw_df = (
    kafka_df.withColumn("symbol", f.col("key"))
    .withColumn("value", f.from_json(f.col("value"), schema))
    .select("symbol", "value.*")
)

In [None]:
raw_df.show()

In [11]:
raw_df = raw_df.filter(f.col("symbol").isNotNull())

In [None]:
raw_df.show()

In [18]:
company_data = raw_df.select("companyProfile.*")

In [None]:
company_data.show()

In [None]:
company_data.printSchema()

In [None]:
final_company_data = company_data.select(
    "ticker",
    "name",
    "phone",
    "weburl",
    "logo",
    f.col("finnhubIndustry").alias("industry"),
    "marketCapitalization",
    "country",
    "exchange",
    "ipo",
    "shareOutstanding",
)

In [None]:
final_company_data.show()

In [28]:
final_company_data.write.mode("append").parquet(
    "s3a://stock-data-output/company-data/final"
)

In [None]:
raw_df.show()

In [57]:
price_data = raw_df.select("symbol", f.posexplode("data").alias("pos", "data")).drop(
    "pos"
)

In [None]:
price_data.show()

In [59]:
price_data = price_data.select(f.col("symbol").alias("ticker"), "data.*")

In [None]:
price_data.show()

In [None]:
final_price_data = 