# CSV VS PARQUET

The benchmarks and results from this notebook have been saved and used in an interactive Streamlit dashboard. You can directly run the Streamlit dashboard to explore the results visually.

To launch the dashboard, run the following command in the terminal:


streamlit run streamlit.py

In [1]:
import pandas as pd
import os
import time
import pyarrow as pa
import pyarrow.parquet as pq
import polars as pl
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler


In [2]:
!pip install polars



In [3]:
csv_path ="all_stocks_5yr.csv"

In [4]:
df=pd.read_csv(csv_path)

In [5]:
csv_size =os.path.getsize(csv_path)/(1024 *1024)
print(f"CSV File Size: {csv_size:2f} MB")

CSV File Size: 28.704510 MB


In [6]:
df.head(5)

Unnamed: 0,date,open,high,low,close,volume,name
0,2013-02-08,15.07,15.12,14.63,14.75,8407500,AAL
1,2013-02-11,14.89,15.01,14.26,14.46,8882000,AAL
2,2013-02-12,14.45,14.51,14.1,14.27,8126000,AAL
3,2013-02-13,14.3,14.94,14.25,14.66,10259500,AAL
4,2013-02-14,14.94,14.96,13.16,13.99,31879900,AAL


In [7]:
print(df.info())
print(df.describe())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 619040 entries, 0 to 619039
Data columns (total 7 columns):
 #   Column  Non-Null Count   Dtype  
---  ------  --------------   -----  
 0   date    619040 non-null  object 
 1   open    619029 non-null  float64
 2   high    619032 non-null  float64
 3   low     619032 non-null  float64
 4   close   619040 non-null  float64
 5   volume  619040 non-null  int64  
 6   name    619040 non-null  object 
dtypes: float64(4), int64(1), object(2)
memory usage: 33.1+ MB
None
                open           high            low          close  \
count  619029.000000  619032.000000  619032.000000  619040.000000   
mean       83.023334      83.778311      82.256096      83.043763   
std        97.378769      98.207519      96.507421      97.389748   
min         1.620000       1.690000       1.500000       1.590000   
25%        40.220000      40.620000      39.830000      40.245000   
50%        62.590000      63.150000      62.020000      62.620000 

# PARQUET COMPRESSION

In [8]:
parquet_path = "all_stocks_5yr.parquet"
start_time = time.time()
df.to_parquet(parquet_path, engine='pyarrow', compression='snappy', index=False)
parquet_write_time = time.time() - start_time


In [9]:
csv_size = os.path.getsize(csv_path) / (1024 * 1024)  # Convert to MB
parquet_size = os.path.getsize(parquet_path) / (1024 * 1024)
print(f"CSV File Size: {csv_size:.2f} MB")
print(f"Parquet File Size: {parquet_size:.2f} MB")

CSV File Size: 28.70 MB
Parquet File Size: 10.15 MB


In [10]:
start_time = time.time()
df.to_csv("temp_1x.csv", index=False)
csv_write_time = time.time() - start_time

In [11]:
print(csv_write_time)

2.570556163787842


In [12]:
start_time = time.time()
pd.read_csv("temp_1x.csv")
csv_read_time = time.time() - start_time

In [13]:
print(csv_read_time)

0.38915443420410156


In [14]:
# Measure Parquet Write Time for 1x
start_time = time.time()
table = pa.Table.from_pandas(df)  # Convert DataFrame to Parquet Table
pq.write_table(table, "all_stocks_5yr_snappy.parquet", compression="snappy")
parquet_write_time = time.time() - start_time

# Print Parquet Write Time
print(f"Parquet Write Time (1x): {parquet_write_time:.4f} sec")


Parquet Write Time (1x): 0.2741 sec


In [15]:
start_time = time.time()
pd.read_parquet(parquet_path)
parquet_read_time = time.time() - start_time

In [16]:
print(parquet_read_time)


0.14321351051330566


In [17]:
df_10x = pd.concat([df] * 10, ignore_index=True)
print(f"Dataset scaled to 10x. New shape: {df_10x.shape}")

Dataset scaled to 10x. New shape: (6190400, 7)


In [18]:
# Define file paths
csv_10x_path = "all_stocks_5yr_10x.csv"
parquet_10x_path = "all_stocks_5yr_10x.parquet"


# Get File Sizes
csv_size_10x = os.path.getsize(csv_10x_path) / (1024 * 1024)  # Convert to MB
parquet_size_10x = os.path.getsize(parquet_10x_path) / (1024 * 1024)

print(f"\n 10x Storage Size Comparison:")
print(f"CSV File Size: {csv_size_10x:.2f} MB")
print(f"Parquet File Size: {parquet_size_10x:.2f} MB")




 10x Storage Size Comparison:
CSV File Size: 288.01 MB
Parquet File Size: 87.58 MB


In [19]:
# Measure CSV Read Time
start_time = time.time()
pd.read_csv(csv_10x_path)
csv_read_time_10x = time.time() - start_time

# Measure Parquet Read Time
start_time = time.time()
pq.read_table(parquet_10x_path).to_pandas()
parquet_read_time_10x = time.time() - start_time

print(f"\n 10x Read Time Comparison:")
print(f"CSV Read Time: {csv_read_time_10x:.4f} sec")
print(f"Parquet Read Time: {parquet_read_time_10x:.4f} sec")



 10x Read Time Comparison:
CSV Read Time: 3.4856 sec
Parquet Read Time: 1.0929 sec


In [20]:
# Measure CSV Write Time
start_time = time.time()
df_10x.to_csv(csv_10x_path, index=False)
csv_write_time_10x = time.time() - start_time

# Measure Parquet Write Time
start_time = time.time()
table_10x = pa.Table.from_pandas(df_10x)
pq.write_table(table_10x, parquet_10x_path, compression="snappy")
parquet_write_time_10x = time.time() - start_time

# Print results
print(f"\n 10x Write Time Comparison:")
print(f"CSV Write Time: {csv_write_time_10x:.4f} sec")
print(f"Parquet Write Time: {parquet_write_time_10x:.4f} sec")





 10x Write Time Comparison:
CSV Write Time: 25.1147 sec
Parquet Write Time: 2.1718 sec


In [21]:
df_100x = pd.concat([df] * 100, ignore_index=True)
print(f"Dataset scaled to 100x. New shape: {df_10x.shape}")

Dataset scaled to 100x. New shape: (6190400, 7)


In [22]:

# Define file paths
csv_100x_path = "all_stocks_5yr_100x.csv"
parquet_100x_path = "all_stocks_5yr_100x.parquet"

#  Efficient CSV Writing using Chunks
chunk_size = 500_000  # Adjust as needed
start_time = time.time()

with open(csv_100x_path, 'w', newline='') as file:
    for i in range(0, len(df_100x), chunk_size):
        df_100x.iloc[i:i+chunk_size].to_csv(file, index=False, header=(i == 0))  # Write header only for first chunk

csv_write_time_100x = time.time() - start_time
print(f" Optimized CSV Write Time (100x): {csv_write_time_100x:.4f} sec")

# Efficient Parquet Writing with Compression
start_time = time.time()
table_100x = pa.Table.from_pandas(df_100x)
pq.write_table(table_100x, parquet_100x_path, compression="snappy")
parquet_write_time_100x = time.time() - start_time

# Get File Sizes
csv_size_100x = os.path.getsize(csv_100x_path) / (1024 * 1024)  # Convert to MB
parquet_size_100x = os.path.getsize(parquet_100x_path) / (1024 * 1024)

#  Print Results
print(f"\n✅ 100x Write Time Comparison:")
print(f"CSV Write Time: {csv_write_time_100x:.4f} sec")
print(f"Parquet Write Time: {parquet_write_time_100x:.4f} sec")

print(f"\n✅ 100x Storage Size Comparison:")
print(f"CSV File Size: {csv_size_100x:.2f} MB")
print(f"Parquet File Size: {parquet_size_100x:.2f} MB")


 Optimized CSV Write Time (100x): 284.1913 sec

✅ 100x Write Time Comparison:
CSV Write Time: 284.1913 sec
Parquet Write Time: 25.9207 sec

✅ 100x Storage Size Comparison:
CSV File Size: 2880.05 MB
Parquet File Size: 861.61 MB


In [23]:


 # Measure Read Time for 100x CSV
start_time = time.time()
df_csv_100x = pd.read_csv(csv_100x_path)
csv_read_time_100x = time.time() - start_time

# Measure Read Time for 100x Parquet
start_time = time.time()
df_parquet_100x = pq.read_table(parquet_100x_path).to_pandas()
parquet_read_time_100x = time.time() - start_time

#  Print Results
print(f"\n 100x Read Time Comparison:")
print(f"CSV Read Time: {csv_read_time_100x:.4f} sec")
print(f"Parquet Read Time: {parquet_read_time_100x:.4f} sec")



 100x Read Time Comparison:
CSV Read Time: 54.9933 sec
Parquet Read Time: 17.8047 sec


In [24]:
parquet_path = "all_stocks_5yr.parquet"

# Load Data with Pandas
start_time = time.time()
df_pandas = pd.read_parquet(parquet_path)
pandas_load_time = time.time() - start_time

#  Load Data with Polars
start_time = time.time()
df_polars = pl.read_parquet(parquet_path)
polars_load_time = time.time() - start_time

#  Print Results
print(f"\n Data Loading Time Comparison:")
print(f"Pandas Load Time: {pandas_load_time:.4f} sec")
print(f"Polars Load Time: {polars_load_time:.4f} sec")


 Data Loading Time Comparison:
Pandas Load Time: 0.1771 sec
Polars Load Time: 0.1848 sec


# Calculate Moving Average 

In [25]:

# Load dataset
df_pandas = pd.read_parquet("all_stocks_5yr.parquet")
df_polars = pl.read_parquet("all_stocks_5yr.parquet")

# Simple Moving Average (SMA)
df_pandas["SMA_20"] = df_pandas["close"].rolling(20).mean()
df_polars = df_polars.with_columns(pl.col("close").rolling_mean(20).alias("SMA_20"))

# Exponential Moving Average (EMA)
df_pandas["EMA_20"] = df_pandas["close"].ewm(span=20, adjust=False).mean()
df_polars = df_polars.with_columns(pl.col("close").ewm_mean(com=9.5).alias("EMA_20"))  # Fixed

# Relative Strength Index (RSI) Function
def calculate_rsi(series, window=14):
    delta = series.diff()
    gain = delta.where(delta > 0, 0).rolling(window).mean()
    loss = -delta.where(delta < 0, 0).rolling(window).mean()
    rs = gain / loss
    return 100 - (100 / (1 + rs))

df_pandas["RSI_14"] = calculate_rsi(df_pandas["close"])
df_polars = df_polars.with_columns(
    (100 - (100 / (1 +
        pl.col("close").diff().clip(0, None).rolling_mean(14) /
        pl.col("close").diff().clip(None, 0).abs().rolling_mean(14)
    ))).alias("RSI_14")
)

# Moving Average Convergence Divergence (MACD)
df_pandas["MACD"] = df_pandas["close"].ewm(span=12, adjust=False).mean() - df_pandas["close"].ewm(span=26, adjust=False).mean()
df_polars = df_polars.with_columns(
    (pl.col("close").ewm_mean(com=5.5) - pl.col("close").ewm_mean(com=12.5)).alias("MACD")
)

# Display Sample Data
print("\nSample Data with Indicators (Pandas):")
print(df_pandas.head())

print("\nSample Data with Indicators (Polars):")
print(df_polars.head())



Sample Data with Indicators (Pandas):
         date   open   high    low  close    volume name  SMA_20     EMA_20  \
0  2013-02-08  15.07  15.12  14.63  14.75   8407500  AAL     NaN  14.750000   
1  2013-02-11  14.89  15.01  14.26  14.46   8882000  AAL     NaN  14.722381   
2  2013-02-12  14.45  14.51  14.10  14.27   8126000  AAL     NaN  14.679297   
3  2013-02-13  14.30  14.94  14.25  14.66  10259500  AAL     NaN  14.677459   
4  2013-02-14  14.94  14.96  13.16  13.99  31879900  AAL     NaN  14.611987   

   RSI_14      MACD  
0     NaN  0.000000  
1     NaN -0.023134  
2     NaN -0.056152  
3     NaN -0.050270  
4     NaN -0.098535  

Sample Data with Indicators (Polars):
shape: (5, 11)
┌────────────┬───────┬───────┬───────┬───┬────────┬───────────┬────────┬───────────┐
│ date       ┆ open  ┆ high  ┆ low   ┆ … ┆ SMA_20 ┆ EMA_20    ┆ RSI_14 ┆ MACD      │
│ ---        ┆ ---   ┆ ---   ┆ ---   ┆   ┆ ---    ┆ ---       ┆ ---    ┆ ---       │
│ str        ┆ f64   ┆ f64   ┆ f64   ┆   ┆ f6

In [26]:

start_time = time.time()
df_pandas["SMA_20"] = df_pandas["close"].rolling(window=20).mean()
df_pandas["EMA_20"] = df_pandas["close"].ewm(span=20, adjust=False).mean()
df_pandas["RSI_14"] = calculate_rsi(df_pandas["close"])
df_pandas["MACD"] = df_pandas["close"].ewm(span=12, adjust=False).mean() - df_pandas["close"].ewm(span=26, adjust=False).mean()
pandas_time = time.time() - start_time

# Measure Polars Execution Time
start_time = time.time()
df_polars = df_polars.with_columns([
    pl.col("close").rolling_mean(20).alias("SMA_20"),
    pl.col("close").ewm_mean(com=9.5).alias("EMA_20"),  # Converted from span=20
    (100 - (100 / (1 +
        pl.col("close").diff().clip(0, None).rolling_mean(14) /
        pl.col("close").diff().clip(None, 0).abs().rolling_mean(14)
    ))).alias("RSI_14"),
    (pl.col("close").ewm_mean(com=5.5) - pl.col("close").ewm_mean(com=12.5)).alias("MACD")  # Converted from span
])
polars_time = time.time() - start_time

# Print Performance Results
print(f"\nPandas vs Polars Execution Time:")
print(f"Pandas Execution Time: {pandas_time:.4f} sec")
print(f"Polars Execution Time: {polars_time:.4f} sec")



Pandas vs Polars Execution Time:
Pandas Execution Time: 0.1426 sec
Polars Execution Time: 0.0652 sec


In [27]:


# Define features and target
features = ["SMA_20", "EMA_20", "RSI_14", "MACD"]
target = "close"

# Drop NaN values (since indicators have missing values at the start)
df_pandas = df_pandas.dropna()

# Extract feature matrix (X) and target variable (y)
X = df_pandas[features]
y = df_pandas[target]

# Split data into train (80%) and test (20%) sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Scale features for better performance
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)


In [28]:
# Train Linear Regression Model
lr_model = LinearRegression()
lr_model.fit(X_train_scaled, y_train)
lr_preds = lr_model.predict(X_test_scaled)
lr_rmse = mean_squared_error(y_test, lr_preds, squared=False)

In [29]:
from sklearn.linear_model import Lasso

# Train Lasso Regression Model
lasso_model = Lasso(alpha=0.1)  # `alpha` controls feature selection

start_time = time.time()
lasso_model.fit(X_train_scaled, y_train)  # Train model
lasso_time = time.time() - start_time

# Predict and calculate RMSE
lasso_preds = lasso_model.predict(X_test_scaled)
lasso_rmse = mean_squared_error(y_test, lasso_preds, squared=False)

# Print Results
print("\n✅ Lasso Regression Performance:")
print(f"RMSE: {lasso_rmse:.4f}")
print(f"Training Time: {lasso_time:.4f} sec")



✅ Lasso Regression Performance:
RMSE: 6.3320
Training Time: 1.7965 sec


In [30]:
from sklearn.ensemble import GradientBoostingRegressor

# Train Gradient Boosting Model
gb_model = GradientBoostingRegressor(n_estimators=50, max_depth=5, learning_rate=0.1, random_state=42)

start_time = time.time()
gb_model.fit(X_train_scaled, y_train)  # Train model
gb_time = time.time() - start_time

# Predict and calculate RMSE
gb_preds = gb_model.predict(X_test_scaled)
gb_rmse = mean_squared_error(y_test, gb_preds, squared=False)

# Print Results
print("\n✅ Gradient Boosting Performance:")
print(f"RMSE: {gb_rmse:.4f}")
print(f"Training Time: {gb_time:.4f} sec")



✅ Gradient Boosting Performance:
RMSE: 3.7749
Training Time: 189.6535 sec


In [31]:
from lightgbm import LGBMRegressor


# Train LightGBM Model
lgb_model = LGBMRegressor(n_estimators=50, max_depth=5, learning_rate=0.1, n_jobs=-1, random_state=42)

start_time = time.time()
lgb_model.fit(X_train_scaled, y_train)  # Train LightGBM
lgb_time = time.time() - start_time

# Predict and calculate RMSE
lgb_preds = lgb_model.predict(X_test_scaled)
lgb_rmse = mean_squared_error(y_test, lgb_preds, squared=False)

# Print Results
print("\n LightGBM Performance:")
print(f"RMSE: {lgb_rmse:.4f}")
print(f"Training Time: {lgb_time:.4f} sec")


[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.008774 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 1020
[LightGBM] [Info] Number of data points in the train set: 495208, number of used features: 4
[LightGBM] [Info] Start training from score 83.017058

 LightGBM Performance:
RMSE: 19.7446
Training Time: 0.5360 sec


In [32]:
!pip install lightgbm



In [33]:

pip install streamlit


Note: you may need to restart the kernel to use updated packages.


In [36]:
import pandas as pd

# Create a DataFrame with benchmarking results using the correct variables
benchmark_results = pd.DataFrame({
    "Metric": [
        "CSV Read Time (1x)", "Parquet Read Time (1x)",
        "CSV Read Time (10x)", "Parquet Read Time (10x)",
        "CSV Read Time (100x)", "Parquet Read Time (100x)",
        "CSV Write Time (1x)", "Parquet Write Time (1x)",
        "CSV Write Time (10x)", "Parquet Write Time (10x)",
        "CSV Write Time (100x)", "Parquet Write Time (100x)",
        "Pandas Execution Time", "Polars Execution Time",
        "Pandas Load Time", "Polars Load Time",
        "Lasso Train Time", "GB Train Time", "LGB Train Time",
        "Lasso MSE", "GB MSE", "LGB MSE"
    ],
    "Value (Seconds/Error)": [
        csv_read_time, parquet_read_time,
        csv_read_time_10x, parquet_read_time_10x,
        csv_read_time_100x, parquet_read_time_100x,
        csv_write_time, parquet_write_time,
        csv_write_time_10x, parquet_write_time_10x,
        csv_write_time_100x, parquet_write_time_100x,
        pandas_time, polars_time,
        pandas_load_time, polars_load_time,
        lasso_time, gb_time, lgb_time,
        lasso_rmse, gb_rmse, lgb_rmse
    ]
})

# Save to CSV
benchmark_results.to_csv("benchmark_results.csv", index=False)
print("✅ Benchmark results saved to 'benchmark_results.csv'")


✅ Benchmark results saved to 'benchmark_results.csv'


In [37]:
import joblib

# Save the best model
joblib.dump(gb_model, "trained_model.pkl")

print("✅ Trained model saved as 'trained_model.pkl'")


✅ Trained model saved as 'trained_model.pkl'


In [40]:
import subprocess

# Run Streamlit in the background without blocking Jupyter Notebook
subprocess.Popen(["streamlit", "run", "app.py"])


<Popen: returncode: None args: ['streamlit', 'run', 'app.py']>

In [41]:
import subprocess
subprocess.Popen(["streamlit", "run", "app.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)


<Popen: returncode: None args: ['streamlit', 'run', 'app.py']>

In [42]:
pip install matplotlib

Note: you may need to restart the kernel to use updated packages.


In [43]:
import joblib

# Load the trained model
model = joblib.load("trained_model.pkl")

# Print the expected number of input features
print(f"Model expects {model.n_features_in_} features.")


Model expects 4 features.


In [44]:
import pandas as pd

# Load the dataset used for training
df = pd.read_csv("all_stocks_5yr.csv")  # Replace with your actual dataset file

# Get the feature columns (excluding the target column)
feature_columns = df.drop(columns=["close"]).columns  # Assuming "close" is the target variable

print("Model Features:", list(feature_columns))


Model Features: ['date', 'open', 'high', 'low', 'volume', 'name']


In [45]:
import joblib
from sklearn.preprocessing import StandardScaler

# Assuming X_train was used for training
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)

# Save the trained scaler
joblib.dump(scaler, "scaler.pkl")
print("✅ Scaler saved as 'scaler.pkl'")


✅ Scaler saved as 'scaler.pkl'
