# Part 1. Data Clean

### Data is coming from around 400 stocks of miniute-level information from 2023/4/1 to 2025/4/1.

### Check the exact numbers of csv files we have 

In [1]:
import os
import pandas as pd

files = os.listdir(r'C:\Users\Username\OneDrive\Desktop\810project')
print(f"Total csv files: {len([f for f in files if f.endswith('.csv')])}")

Total csv files: 428


### Merge all csv to one for the convenience of following processes

In [None]:
import pandas as pd

path = r'C:\Users\Username\OneDrive\Desktop\810project'
csv_files = [f for f in os.listdir(path) if f.endswith(".csv")]

all_data = []
for file in csv_files:
    df = pd.read_csv(os.path.join(path, file))
    ticker = file.replace(".csv", "")
    df["ticker"] = ticker
    all_data.append(df)

merged_df = pd.concat(all_data, ignore_index=True)
merged_df.to_csv(r'C:\Users\Username\OneDrive\Desktop\810project\merged_raw.csv', index=False)

### Too slow, transfrom to Parquet to improve efficiency

In [None]:
merged_df.to_parquet(r'C:\Users\Username\OneDrive\Desktop\810project\merged_raw.parquet', index=False)


In [None]:
import pandas as pd
df_par = pd.read_parquet(r'C:\Users\Username\OneDrive\Desktop\810project\mermiss_raw.parquet')

### Simply view the data

In [None]:
df_par.shape

In [None]:
df_par.head(10)

In [None]:
df_par.info()

In [None]:
df_par.isna().sum()

### Rename the columns name for better viewing

In [None]:
df_par = df_par.rename(columns={
    'v': 'volume',
    'vw': 'vwap',
    'o': 'open',
    'c': 'close',
    'h': 'high',
    'l': 'low',
    't': 'timestamp',
    'n': 'transactions'
})
df_par.head()

### We are starting to deal with the missing values

### Since the data is min-level stock info, for the price columns, we use ffill to deal with
### Starting as a quant, we need to ensure the truth of data, so just apply ffill

In [None]:
# define the price cols 
price_cols = ['open', 'close', 'high', 'low']

# use for loop to ffill
for col in price_cols:
    df_par[col] = df_par.groupby('ticker')[col].ffill()

# check again the data
df_par.isna().sum()

### For the volume, just fill 0 since it does not matter
### For the volume weighted average price, we apply linear interpoate here

In [None]:
df_par['volume'] = df_par['volume'].fillna(0)

# the vwap need to interpolate in the same ticker
df_par['vwap'] = df_par.groupby('ticker')['vwap'].transform(lambda group: group.interpolate(method='linear'))

df_par.isna().sum()

### Now the data seems good!

### Save the clean df so that we do not need to clean again!

In [None]:
df_par.to_parquet(r'C:\Users\Username\OneDrive\Desktop\810project\merged_cleaned.parquet', index=False)

In [None]:
import pandas as pd
df_par = pd.read_parquet()

### Use spark to do data clean

In [12]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("810Project") \
    .master("local[*]") \
    .config("spark.driver.memory", "24g") \
    .getOrCreate()

In [13]:
df_spark = spark.read.parquet(r'C:\Users\Username\OneDrive\Desktop\810project\mermiss_raw.parquet')
df_spark.printSchema()
df_spark.show(10)


root
 |-- v: double (nullable = true)
 |-- vw: double (nullable = true)
 |-- o: double (nullable = true)
 |-- c: double (nullable = true)
 |-- h: double (nullable = true)
 |-- l: double (nullable = true)
 |-- t: long (nullable = true)
 |-- n: long (nullable = true)
 |-- datetime: string (nullable = true)
 |-- ticker: string (nullable = true)

+-------+--------+-------+--------+-------+------+-------------+---+-------------------+------+
|      v|      vw|      o|       c|      h|     l|            t|  n|           datetime|ticker|
+-------+--------+-------+--------+-------+------+-------------+---+-------------------+------+
|17632.0|137.0174| 137.42|  137.26|137.565|136.85|1680528600000|105|2023-04-03 13:30:00|     A|
| 1011.0| 137.253|137.255|  137.32| 137.32|137.25|1680528660000| 28|2023-04-03 13:31:00|     A|
|  697.0|137.2364| 137.22|  137.33| 137.33|136.95|1680528720000| 18|2023-04-03 13:32:00|     A|
| 4004.0|137.3055| 136.99|  137.57| 137.57|136.99|1680528780000| 79|2023-04-03 

In [14]:
rename_dict = {
    "v": "volume",
    "vw": "vwap",
    "o": "open",
    "c": "close",
    "h": "high",
    "l": "low",
    "t": "timestamp",
    "n": "transactions"
}
for old_name, new_name in rename_dict.items():
    df_spark = df_spark.withColumnRenamed(old_name, new_name)
df_spark.show(5)

+-------+--------+-------+--------+-------+------+-------------+------------+-------------------+------+
| volume|    vwap|   open|   close|   high|   low|    timestamp|transactions|           datetime|ticker|
+-------+--------+-------+--------+-------+------+-------------+------------+-------------------+------+
|17632.0|137.0174| 137.42|  137.26|137.565|136.85|1680528600000|         105|2023-04-03 13:30:00|     A|
| 1011.0| 137.253|137.255|  137.32| 137.32|137.25|1680528660000|          28|2023-04-03 13:31:00|     A|
|  697.0|137.2364| 137.22|  137.33| 137.33|136.95|1680528720000|          18|2023-04-03 13:32:00|     A|
| 4004.0|137.3055| 136.99|  137.57| 137.57|136.99|1680528780000|          79|2023-04-03 13:33:00|     A|
| 7244.0|137.4399| 137.69|137.3784| 137.72|137.11|1680528840000|         107|2023-04-03 13:34:00|     A|
+-------+--------+-------+--------+-------+------+-------------+------------+-------------------+------+
only showing top 5 rows



In [15]:

df_spark.createOrReplaceTempView("market")


spark.sql("""
SELECT
    SUM(CASE WHEN open IS NULL THEN 1 ELSE 0 END) AS null_open,
    SUM(CASE WHEN high IS NULL THEN 1 ELSE 0 END) AS null_high,
    SUM(CASE WHEN low IS NULL THEN 1 ELSE 0 END) AS null_low,
    SUM(CASE WHEN close IS NULL THEN 1 ELSE 0 END) AS null_close,
    SUM(CASE WHEN volume IS NULL THEN 1 ELSE 0 END) AS null_volume,
    SUM(CASE WHEN vwap IS NULL THEN 1 ELSE 0 END) AS null_vwap
FROM market
""").show()


spark.sql("""
SELECT ticker, COUNT(*) AS cnt, SUM(volume) AS total_volume
FROM market
GROUP BY ticker
HAVING total_volume = 0
""").show()


spark.sql("""
SELECT ticker, COUNT(*) AS cnt,
       SUM(CASE WHEN vwap IS NULL THEN 1 ELSE 0 END) AS null_vwap_cnt
FROM market
GROUP BY ticker
HAVING null_vwap_cnt = cnt
""").show()

df_spark.printSchema()

spark.sql("SELECT COUNT(DISTINCT ticker) AS unique_ticker_count FROM market").show()

+---------+---------+--------+----------+-----------+---------+
|null_open|null_high|null_low|null_close|null_volume|null_vwap|
+---------+---------+--------+----------+-----------+---------+
|      218|      216|     219|       198|        209|      190|
+---------+---------+--------+----------+-----------+---------+

+------+---+------------+
|ticker|cnt|total_volume|
+------+---+------------+
+------+---+------------+

+------+---+-------------+
|ticker|cnt|null_vwap_cnt|
+------+---+-------------+
+------+---+-------------+

root
 |-- volume: double (nullable = true)
 |-- vwap: double (nullable = true)
 |-- open: double (nullable = true)
 |-- close: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- transactions: long (nullable = true)
 |-- datetime: string (nullable = true)
 |-- ticker: string (nullable = true)

+-------------------+
|unique_ticker_count|
+-------------------+
|               

In [16]:
from pyspark.sql.window import Window
from pyspark.sql.functions import last, col
import sys

w = Window.partitionBy("ticker").orderBy("datetime").rowsBetween(-sys.maxsize, 0)

# forward fill open/high/low/close
for colname in ["open", "high", "low", "close"]:
    df_spark = df_spark.withColumn(colname, last(col(colname), ignorenulls=True).over(w))


In [18]:
from pyspark.sql.functions import lag, lead, when

df_spark = df_spark.fillna({'volume': 0})

w = Window.partitionBy("ticker").orderBy("datetime")
# linear interpolation for vwap (very simplified)
df_spark = df_spark.withColumn("vwap_prev", lag("vwap", 1).over(w))
df_spark = df_spark.withColumn("vwap_next", lead("vwap", 1).over(w))
df_spark = df_spark.withColumn(
    "vwap_interp",
    when(col("vwap").isNull(), (col("vwap_prev") + col("vwap_next")) / 2).otherwise(col("vwap"))
).drop("vwap").withColumnRenamed("vwap_interp", "vwap")

df_spark.show(5)

+-------+------+------+------+-----+-------------+------------+-------------------+------+---------+---------+-------+
| volume|  open| close|  high|  low|    timestamp|transactions|           datetime|ticker|vwap_prev|vwap_next|   vwap|
+-------+------+------+------+-----+-------------+------------+-------------------+------+---------+---------+-------+
|  284.0|  85.5|  85.5|  85.5| 85.5|1680519600000|          16|2023-04-03 11:00:00|   GIS|     NULL|  85.3907|85.5081|
|  713.0| 85.37| 85.37| 85.37|85.37|1680528540000|          17|2023-04-03 13:29:00|   GIS|  85.5081|  85.4951|85.3907|
|85741.0| 85.49|85.555| 85.65|85.43|1680528600000|         275|2023-04-03 13:30:00|   GIS|  85.3907|  85.5402|85.4951|
| 6811.0| 85.57| 85.47| 85.66|85.45|1680528660000|         127|2023-04-03 13:31:00|   GIS|  85.4951|   85.393|85.5402|
| 3919.0|85.475| 85.36|85.475|85.35|1680528720000|          95|2023-04-03 13:32:00|   GIS|  85.5402|  85.2405| 85.393|
+-------+------+------+------+-----+------------

In [None]:
df_spark.write.mode("overwrite").parquet("merged_cleaned_spark.parquet")

In [19]:
spark.stop()

# Part 2: Feature Engineering

### Time Features

In [None]:
df_par["datetime"] = pd.to_datetime(df_par["datetime"])

df_par["minute"] = df_par["datetime"].dt.minute
df_par["hour"] = df_par["datetime"].dt.hour
df_par["dayofweek"] = df_par["datetime"].dt.dayofweek
df_par["is_open_hour"] = df_par["hour"].between(9, 16)

### Price Features

In [None]:
import numpy as np
df_par["hl_spread"] = df_par["high"] - df_par["low"]
df_par["oc_return"] = (df_par["close"] - df_par["open"]) / df_par["open"]

df_par["log_return"] = (
    df_par.groupby("ticker")["close"]
    .apply(lambda x: np.log(x / x.shift(1)))
    .reset_index(drop=True)
)


### Lag Features

In [None]:

df_par["close_lag1"] = (
    df_par.groupby("ticker")["close"]
    .shift(1)
    .reset_index(drop=True)
)


df_par["close_roll_mean_5"] = (
    df_par.groupby("ticker")["close"]
    .transform(lambda x: x.rolling(5).mean())
    .reset_index(drop=True)
)


df_par["volume_roll_std_15"] = (
    df_par.groupby("ticker")["volume"]
    .transform(lambda x: x.rolling(15).std())
    .reset_index(drop=True)
)

In [None]:
df_par.head()

In [None]:
df_par.to_parquet(r'C:\Users\Username\OneDrive\Desktop\810project\dffeatures.parquet', index=False)