In [None]:
from pyspark.sql import SparkSession
import glob
import numpy as np
import pandas as pd
import os
import concurrent.futures
from tqdm import tqdm

In [None]:
#Set Environment accordingly
os.environ['JAVA_HOME'] = "/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/"
os.environ['SPARK_HOME'] = "/Users/simran/Downloads/spark-3.5.5-bin-hadoop3/"

In [None]:
directory_path = "./full_history"
file_pattern = "*.csv"

In [None]:

# Initialize Spark session
spark = SparkSession.builder \
    .appName("BigDataProject1") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.executor.memory", "12g") \
    .config("spark.driver.memory", "12g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.memory.offHeap.enabled", True) \
    .config("spark.memory.offHeap.size", "20g") \
    .config("spark.default.parallelism", "100") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
    .config("spark.serializer", "org.apache.spark.serializer.JavaSerializer") \
    .getOrCreate()


In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType

schema = StructType([
    StructField("date", DateType(), True),
    StructField("volume", DoubleType(), True),  
    StructField("open", DoubleType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("close", DoubleType(), True),
    StructField("adj close", DoubleType(), True),
    StructField("StockName", StringType(), True)
])

In [None]:
# Load datasets as DataFrames
history_df = spark.read.format("csv") \
     .option("header", "true") \
     .option("treatEmptyValuesAsNulls", "true") \
     .option("schema",schema) \
     .load(f"{directory_path}/{file_pattern}")


history_df.show(5)

In [None]:
# Remove duplicates
history_df = history_df.dropDuplicates()

# Handle missing values for both open and close simultaneously
history_df = history_df.na.fill({
    "open": np.nan,
    "close": np.nan
})
history_df = history_df.replace(float('nan'), np.nan)

history_df.cache()