In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import pandas as pd

# Initialize the Spark session with increased driver memory
spark = SparkSession.builder \
    .appName("Stock Data Analysis") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

# Print the Spark session to confirm it started successfully
print(spark)

24/10/31 01:59:32 WARN Utils: Your hostname, Sarabjots-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.134 instead (on interface en0)
24/10/31 01:59:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/31 01:59:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<pyspark.sql.session.SparkSession object at 0x127faf650>


In [2]:
file_path = "/Users/sarabjotsingh/Desktop/Dataset/combined_file.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.show(5)  




+--------------------+-----------------+-----------------+-----------------+-----------------+------+---------+------------+------+-------------+---------+
|                Date|             Open|             High|              Low|            Close|Volume|Dividends|Stock Splits|symbol|Capital Gains|Adj Close|
+--------------------+-----------------+-----------------+-----------------+-----------------+------+---------+------------+------+-------------+---------+
|2016-05-18 00:00:...|4.130000114440918|4.400000095367432|4.099999904632568|4.170000076293945|626100|      0.0|         0.0|  PLSE|         NULL|     NULL|
|2016-05-19 00:00:...|4.179999828338623|4.199999809265137|4.050000190734863|4.079999923706055|258200|      0.0|         0.0|  PLSE|         NULL|     NULL|
|2016-05-20 00:00:...|4.079999923706055|4.159999847412109|4.079999923706055|4.130000114440918| 66600|      0.0|         0.0|  PLSE|         NULL|     NULL|
|2016-05-23 00:00:...|             4.25|             4.25|4.0599

                                                                                

In [None]:
# Get column names and data types
df.dtypes


[('Date', 'string'),
 ('Open', 'double'),
 ('High', 'double'),
 ('Low', 'double'),
 ('Close', 'double'),
 ('Volume', 'bigint'),
 ('Dividends', 'double'),
 ('Stock Splits', 'double'),
 ('symbol', 'string'),
 ('Capital Gains', 'double'),
 ('Adj Close', 'string')]

In [4]:
# Select the Date column and show the first 10 rows
df.select("Date").show(10)


+--------------------+
|                Date|
+--------------------+
|2016-05-18 00:00:...|
|2016-05-19 00:00:...|
|2016-05-20 00:00:...|
|2016-05-23 00:00:...|
|2016-05-24 00:00:...|
|2016-05-25 00:00:...|
|2016-05-26 00:00:...|
|2016-05-27 00:00:...|
|2016-05-31 00:00:...|
|2016-06-01 00:00:...|
+--------------------+
only showing top 10 rows



In [5]:

# Step 1: Convert 'Date' to timestamp and ensure it's in UTC
df = df.withColumn("Date", F.to_timestamp(F.col("Date"), "yyyy-MM-dd HH:mm:ss").alias("Date"))

# Step 2: Convert the Date column from UTC to EST (New York timezone)
# Since PySpark doesn't support timezone-aware timestamps directly, we can use `from_utc_timestamp` 
df = df.withColumn("Date", F.from_utc_timestamp(F.col("Date"), "America/New_York"))


In [6]:
# Show a few rows to check the Date column
df.select("Date").show(10, truncate=False)

+-------------------+
|Date               |
+-------------------+
|2016-05-17 20:00:00|
|2016-05-18 20:00:00|
|2016-05-19 20:00:00|
|2016-05-22 20:00:00|
|2016-05-23 20:00:00|
|2016-05-24 20:00:00|
|2016-05-25 20:00:00|
|2016-05-26 20:00:00|
|2016-05-30 20:00:00|
|2016-05-31 20:00:00|
+-------------------+
only showing top 10 rows



In [7]:
df.dtypes

[('Date', 'timestamp'),
 ('Open', 'double'),
 ('High', 'double'),
 ('Low', 'double'),
 ('Close', 'double'),
 ('Volume', 'bigint'),
 ('Dividends', 'double'),
 ('Stock Splits', 'double'),
 ('symbol', 'string'),
 ('Capital Gains', 'double'),
 ('Adj Close', 'string')]

In [8]:
from pyspark.sql.functions import col

# Cast the relevant columns to float
sorted_df = df.withColumn("Open", col("Open").cast("double")) \
                 .withColumn("High", col("High").cast("double")) \
                 .withColumn("Low", col("Low").cast("double")) \
                 .withColumn("Close", col("Close").cast("double")) \
                 .withColumn("Volume", col("Volume").cast("double")) \
                 .withColumn("Dividends", col("Dividends").cast("double")) \
                 .withColumn("Stock Splits", col("Stock Splits").cast("double")) \
                 .withColumn("Capital Gains", col("Capital Gains").cast("double")) \
                 .withColumn("Adj Close", col("Adj Close").cast("double"))

# Verify the datatypes
sorted_df.printSchema()


root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Dividends: double (nullable = true)
 |-- Stock Splits: double (nullable = true)
 |-- symbol: string (nullable = true)
 |-- Capital Gains: double (nullable = true)
 |-- Adj Close: double (nullable = true)



In [9]:
# Sort the DataFrame
sorted_df = df.orderBy("symbol", "Date")

In [10]:
columns = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Dividends', 'Stock Splits', 'symbol', 'Capital Gains', 'Adj Close']
# Count total rows
total_counts = sorted_df.count()

# Count missing values for each column
missing_counts = sorted_df.agg(*(F.sum(F.col(col).isNull().cast("int")).alias(col) for col in sorted_df.columns))

# Extract missing values into a dictionary for easy access
missing_dict = missing_counts.first().asDict()

# Calculate non-null counts for each column and print the results
print("\nCounts of non-null values in sorted_df:")
for col in sorted_df.columns:
    non_null_count = total_counts - missing_dict[col]
    print(f"{col}: {non_null_count}, Missing: {missing_dict[col]}")




Counts of non-null values in sorted_df:
Date: 32236153, Missing: 0
Open: 32236148, Missing: 5
High: 32236148, Missing: 5
Low: 32236148, Missing: 5
Close: 32236149, Missing: 4
Volume: 32236153, Missing: 0
Dividends: 32236153, Missing: 0
Stock Splits: 32236153, Missing: 0
symbol: 32236153, Missing: 0
Capital Gains: 6347106, Missing: 25889047
Adj Close: 0, Missing: 32236153


                                                                                

In [11]:
from pyspark.sql import Window
import pyspark.sql.functions as F

# Define window partitioned by symbol and ordered by Date
symbol_window = Window.partitionBy("symbol").orderBy("Date")
symbol_window_desc = symbol_window.rowsBetween(-7, 7)  # 7-day rolling window

# Forward fill and backward fill functions
def forward_fill(df, column):
    return df.withColumn(
        f"{column}_ffill",
        F.last(F.col(column), ignorenulls=True).over(symbol_window)
    )

def backward_fill(df, column):
    return df.withColumn(
        f"{column}_bfill",
        F.first(F.col(column), ignorenulls=True).over(symbol_window_desc)
    )

# Start with the sorted DataFrame
cleaned_df = sorted_df

# Columns to impute
columns_to_impute = ["open", "high", "low", "close"]

# Apply forward and backward fill on sorted_df and store in cleaned_df
for col in columns_to_impute:
    cleaned_df = forward_fill(cleaned_df, col)
    cleaned_df = backward_fill(cleaned_df, col)

# Now, fill null values by averaging forward and backward fills
for col in columns_to_impute:
    cleaned_df = cleaned_df.withColumn(
        col,
        F.when(F.col(col).isNull(), (F.col(f"{col}_ffill") + F.col(f"{col}_bfill")) / 2).otherwise(F.col(col))
    ).drop(f"{col}_ffill", f"{col}_bfill")

# Check for nulls after forward/backward fill
missing_counts = cleaned_df.agg(*(F.sum(F.col(col).isNull().cast("int")).alias(col) for col in columns_to_impute))
print("Nulls after forward/backward fill:", missing_counts.collect())

# Step 3: Impute with a 7-day rolling average for any remaining nulls
for col in columns_to_impute:
    cleaned_df = cleaned_df.withColumn(
        col,
        F.when(F.col(col).isNull(), F.avg(col).over(symbol_window_desc)).otherwise(F.col(col))
    )

# Final null check
final_missing_counts = cleaned_df.agg(*(F.sum(F.col(col).isNull().cast("int")).alias(col) for col in columns_to_impute))
print("Final null counts after all imputations:", final_missing_counts.collect())

# Display cleaned data
cleaned_df.show()

                                                                                

Nulls after forward/backward fill: [Row(open=1, high=1, low=1, close=0)]


                                                                                

Final null counts after all imputations: [Row(open=0, high=0, low=0, close=0)]




+-------------------+------------------+------------------+------------------+------------------+------+---------+------------+------+-------------+---------+
|               Date|              open|              high|               low|             close|Volume|Dividends|Stock Splits|symbol|Capital Gains|Adj Close|
+-------------------+------------------+------------------+------------------+------------------+------+---------+------------+------+-------------+---------+
|2024-01-10 19:00:00|24.565000534057617|24.860200881958008|24.520099639892575|24.860200881958008|   416|      0.0|         0.0|  AAPX|          0.0|     NULL|
|2024-01-11 19:00:00|24.921899795532227|24.921899795532227|24.921899795532227|24.921899795532227|   287|      0.0|         0.0|  AAPX|          0.0|     NULL|
|2024-01-15 19:00:00|              24.0|24.285200119018555|              24.0|24.285200119018555|   671|      0.0|         0.0|  AAPX|          0.0|     NULL|
|2024-01-16 19:00:00| 23.84000015258789|24.092

                                                                                

In [12]:

# Count total rows
total_counts = cleaned_df.count()

# Count missing values for each column
missing_counts = cleaned_df.agg(*(F.sum(F.col(col).isNull().cast("int")).alias(col) for col in cleaned_df.columns))

# Extract missing values into a dictionary for easy access
missing_dict = missing_counts.first().asDict()

# Calculate non-null counts for each column and print the results
print("\nCounts of non-null values in sorted_df:")
for col in cleaned_df.columns:
    non_null_count = total_counts - missing_dict[col]
    print(f"{col}: {non_null_count}, Missing: {missing_dict[col]}")




Counts of non-null values in sorted_df:
Date: 32236153, Missing: 0
open: 32236153, Missing: 0
high: 32236153, Missing: 0
low: 32236153, Missing: 0
close: 32236153, Missing: 0
Volume: 32236153, Missing: 0
Dividends: 32236153, Missing: 0
Stock Splits: 32236153, Missing: 0
symbol: 32236153, Missing: 0
Capital Gains: 6347106, Missing: 25889047
Adj Close: 0, Missing: 32236153


                                                                                

In [13]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Define a window partitioned by 'symbol' for performing the calculation per symbol
windowSpec = Window.partitionBy("symbol").orderBy("Date")

# Replace null and 0 values in 'Stock Splits' with 1
cleaned_df = cleaned_df.withColumn(
    "Stock Splits",
    F.when(F.col("Stock Splits").isNull() | (F.col("Stock Splits") == 0), 1).otherwise(F.col("Stock Splits"))
)

# Calculate 'Adj Close' using the updated 'Stock Splits' column
cleaned_df = cleaned_df.withColumn(
    "Adj Close",
    (F.col("Close") - F.col("Dividends")) / F.col("Stock Splits")
)


In [14]:
# Convert all column names in the DataFrame to lowercase
cleaned_df = cleaned_df.toDF(*[col.lower() for col in cleaned_df.columns])

# Calculate the count of null values for each column
null_counts = cleaned_df.select(
    [F.count(F.when(F.col(col).isNull(), col)).alias(col) for col in cleaned_df.columns]
)

# Show the result
null_counts.show()



+----+----+----+---+-----+------+---------+------------+------+-------------+---------+
|date|open|high|low|close|volume|dividends|stock splits|symbol|capital gains|adj close|
+----+----+----+---+-----+------+---------+------------+------+-------------+---------+
|   0|   0|   0|  0|    0|     0|        0|           0|     0|     25889047|        0|
+----+----+----+---+-----+------+---------+------------+------+-------------+---------+



                                                                                

In [15]:
# Shift the 'Close' price to get the previous day's Close within each symbol group
cleaned_df = cleaned_df.withColumn("Previous Day Close", F.lag("Close").over(windowSpec))

# Calculate Capital Gains
cleaned_df = cleaned_df.withColumn("Capital Gains", F.col("Close") - F.col("Previous Day Close"))

# Show the resulting DataFrame
cleaned_df.show()



+-------------------+------------------+------------------+------------------+------------------+------+---------+------------+------+--------------------+------------------+------------------+
|               date|              open|              high|               low|             close|volume|dividends|stock splits|symbol|       Capital Gains|         adj close|Previous Day Close|
+-------------------+------------------+------------------+------------------+------------------+------+---------+------------+------+--------------------+------------------+------------------+
|2024-01-10 19:00:00|24.565000534057617|24.860200881958008|24.520099639892575|24.860200881958008|   416|      0.0|         1.0|  AAPX|                NULL|24.860200881958008|              NULL|
|2024-01-11 19:00:00|24.921899795532227|24.921899795532227|24.921899795532227|24.921899795532227|   287|      0.0|         1.0|  AAPX| 0.06169891357421875|24.921899795532227|24.860200881958008|
|2024-01-15 19:00:00|         

                                                                                

In [16]:
# Convert all column names in the DataFrame to lowercase
cleaned_df = cleaned_df.toDF(*[col.lower() for col in cleaned_df.columns])

# Calculate the count of null values for each column
null_counts = cleaned_df.select(
    [F.count(F.when(F.col(col).isNull(), col)).alias(col) for col in cleaned_df.columns]
)

# Show the result
null_counts.show()



+----+----+----+---+-----+------+---------+------------+------+-------------+---------+------------------+
|date|open|high|low|close|volume|dividends|stock splits|symbol|capital gains|adj close|previous day close|
+----+----+----+---+-----+------+---------+------------+------+-------------+---------+------------------+
|   0|   0|   0|  0|    0|     0|        0|           0|     0|        10034|        0|             10034|
+----+----+----+---+-----+------+---------+------------+------+-------------+---------+------------------+



                                                                                

In [17]:
# Drop rows where Capital Gains or Previous Day Close is null
cleaned_df = cleaned_df.dropna(subset=["Capital Gains", "Previous Day Close"])

# Verify the result
cleaned_df.show()



+-------------------+------------------+------------------+------------------+------------------+------+---------+------------+------+--------------------+------------------+------------------+
|               date|              open|              high|               low|             close|volume|dividends|stock splits|symbol|       capital gains|         adj close|previous day close|
+-------------------+------------------+------------------+------------------+------------------+------+---------+------------+------+--------------------+------------------+------------------+
|2024-01-11 19:00:00|24.921899795532227|24.921899795532227|24.921899795532227|24.921899795532227|   287|      0.0|         1.0|  AAPX| 0.06169891357421875|24.921899795532227|24.860200881958008|
|2024-01-15 19:00:00|              24.0|24.285200119018555|              24.0|24.285200119018555|   671|      0.0|         1.0|  AAPX| -0.6366996765136719|24.285200119018555|24.921899795532227|
|2024-01-16 19:00:00| 23.84000

                                                                                

In [18]:
# Convert all column names in the DataFrame to lowercase
cleaned_df = cleaned_df.toDF(*[col.lower() for col in cleaned_df.columns])

# Calculate the count of null values for each column
null_counts = cleaned_df.select(
    [F.count(F.when(F.col(col).isNull(), col)).alias(col) for col in cleaned_df.columns]
)

# Show the result
null_counts.show()



+----+----+----+---+-----+------+---------+------------+------+-------------+---------+------------------+
|date|open|high|low|close|volume|dividends|stock splits|symbol|capital gains|adj close|previous day close|
+----+----+----+---+-----+------+---------+------------+------+-------------+---------+------------------+
|   0|   0|   0|  0|    0|     0|        0|           0|     0|            0|        0|                 0|
+----+----+----+---+-----+------+---------+------------+------+-------------+---------+------------------+



                                                                                

In [19]:
from pyspark.sql import functions as F

# Define the end date based on your dataset
end_date = "2024-09-06"

# Calculate the start date for the last 90 days
start_date = F.date_sub(F.lit(end_date), 90)

# Filter the DataFrame based on the last 90 days for each stock
cleaned_df = (
    cleaned_df
    .filter((F.col("Date") >= start_date) & (F.col("Date") <= F.lit(end_date)))
)

# Show the filtered DataFrame
cleaned_df.show()



+-------------------+------------------+------------------+------------------+------------------+------+---------+------------+------+--------------------+------------------+------------------+
|               date|              open|              high|               low|             close|volume|dividends|stock splits|symbol|       capital gains|         adj close|previous day close|
+-------------------+------------------+------------------+------------------+------------------+------+---------+------------+------+--------------------+------------------+------------------+
|2024-06-09 20:00:00|             26.25|             26.25|24.975000381469727|  25.1200008392334| 96778|      0.0|         1.0|  AAPX| -1.0099983215332031|  25.1200008392334|  26.1299991607666|
|2024-06-10 20:00:00|25.309999465942383|28.743000030517575|25.309999465942383|28.719999313354492|135347|      0.0|         1.0|  AAPX|  3.5999984741210938|28.719999313354492|  25.1200008392334|
|2024-06-11 20:00:00| 28.93000

                                                                                

In [20]:
# Save the cleaned DataFrame as a single CSV file in the GCS bucket
output_path = "/Users/sarabjotsingh/Desktop/Dataset/eda_file.csv"

# Use coalesce(1) to combine all partitions into a single file
cleaned_df.coalesce(1).write.csv(output_path, header=True, mode="overwrite")

                                                                                

24/10/31 17:51:36 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1002765 ms exceeds timeout 120000 ms
24/10/31 17:51:36 WARN SparkContext: Killing executors is not supported by current scheduler.
24/10/31 18:07:02 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$