# Pedro - Short Queeze Predictor
---

### 1. Libraries Import

In [None]:
import numpy as np
import pandas as pd
import yfinance as yf
from pathlib import Path
import pandas_market_calendars as mcal
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler,OneHotEncoder
from sklearn.metrics import classification_report, confusion_matrix, balanced_accuracy_score
from sklearn.ensemble import RandomForestClassifier
from imblearn.over_sampling import SMOTE
from imblearn.metrics import classification_report_imbalanced
import hvplot.pandas
import matplotlib.pyplot as plt
from joblib import dump

### 2. Data Preparation Constants

In [None]:
Desired_Days = [1, 2, 5, 7, 15, 30]
ShortFloat = 17
MarketCap = 300000000
Insider_Amount = 500000
short_float_filepath = "Resources/ShortFloat.csv"
insider_trading_filepath = "Resources/InsiderTrading.csv"

### 3. Data Loading and Preprocessing Function

In [None]:
def load_and_preprocess_data(short_float_filepath, insider_trading_filepath):
    
    short_df = pd.read_csv(short_float_filepath)
    short_df.rename(columns={'ShortSqueeze.com Short Interest Data': 'Company Name'}, inplace=True)
    # Dropping irrelevant columns
    columns_to_drop = [
    'Total Short Interest', 'Days to Cover', 'Performance (52-wk)', 'Short: Prior Mo', '% Change Mo/Mo',
    'Shares: Float', 'Avg. Daily Vol.', 'Shares: Outstanding', 'Short Squeeze Ranking™', '% from 52-wk High',
    '(abs)', '% from 200 day MA', '(abs).1', '% from 50 day MA', '(abs).2',
    '% Institutional Ownership'
    ]
    columns_to_drop = [col for col in columns_to_drop if col in short_df.columns]
    short_df.drop(columns_to_drop, axis=1, inplace=True)
    # Convert 'Short % of Float' and 'Market Cap' to numeric and apply filters
    short_df['Short % of Float'] = pd.to_numeric(short_df['Short % of Float'], errors='coerce')
    short_df = short_df[short_df['Short % of Float'] >= ShortFloat]
    short_df['Market Cap'] = pd.to_numeric(short_df['Market Cap'], errors='coerce')
    short_df = short_df[short_df['Market Cap'] >= MarketCap]
    
    insider_df = pd.read_csv(insider_trading_filepath)
    insider_df['Total Amount'] = insider_df['Total Amount'].replace({'\$': '', ',': ''}, regex=True).astype(float)
    insider_df['Share Price'] = insider_df['Share Price'].replace({'\$': '', ',': ''}, regex=True).astype(float)
    insider_df['Date'] = pd.to_datetime(insider_df['Date'])
    insider_df['Total Amount'] = pd.to_numeric(insider_df['Total Amount'], errors='coerce')
    insider_df = insider_df[insider_df['Total Amount'] >= Insider_Amount]
    
    return short_df, insider_df

In [None]:
# Load and preprocess data
short_df, insider_df = load_and_preprocess_data(short_float_filepath, insider_trading_filepath)

### 4. Feature Engineering Function

In [None]:
def feature_engineering(short_df, insider_df):
    date_mapping = {
    'JanA': '01-11', 'JanB': '01-25',
    'FebA': '02-09', 'FebB': '02-27',
    'MarA': '03-09', 'MarB': '03-24',
    'AprA': '04-12', 'AprB': '04-25',
    'MayA': '05-09', 'MayB': '05-24',
    'JunA': '06-09', 'JunB': '06-27',
    'JulA': '07-12', 'JulB': '07-25',
    'AugA': '08-09', 'AugB': '08-24',
    'SepA': '09-12', 'SepB': '09-26',
    'OctA': '10-10', 'OctB': '10-24',
    'NovA': '11-09', 'NovB': '11-27',
    'DecA': '12-11', 'DecB': '12-27',
    }
    
    short_df['Record Date'] = pd.to_datetime(short_df['Record Date'].str.replace(r'(\d{4})-(\w+)', lambda m: f'{m.group(1)}-{date_mapping[m.group(2)]}'))
    short_df.sort_values('Record Date', inplace=True)
    short_df.reset_index(drop=True, inplace=True)
    
    merged_df = pd.merge(short_df, insider_df, on='Symbol')
    merged_df['Share Price'] = merged_df['Share Price'].replace({'\$': '', ',': ''}, regex=True).astype(float)
    merged_df = merged_df[['Symbol', 'Short % of Float', 'Total Amount', '% Insider Ownership', 'Record Date', 'Share Price', 'Company Name', 'Sector', 'Industry', 'Date']]
    
    # Calculate the difference between 'Date' and 'Record Date' for each row
    merged_df['Date_diff'] = (merged_df['Date'] - merged_df['Record Date']).dt.days
    # Filter out rows where 'Date_diff' is more than 30 and drop unnecessary columns
    merged_df = merged_df[merged_df['Date_diff'] >= 0]
    merged_df.sort_values(['Symbol', 'Date_diff'], inplace=True)
    merged_df.drop_duplicates(subset=['Symbol', 'Date'], keep='first', inplace=True)
    merged_df = merged_df[merged_df['Date_diff'] <= 30]
    merged_df.drop(columns=['Record Date', 'Date_diff'], inplace=True)

    # Reorder columns
    new_column_order = ['Symbol', 'Short % of Float', 'Total Amount', '% Insider Ownership', 'Date', 'Company Name', 'Sector', 'Industry']
    merged_df = merged_df[new_column_order]
    
    # Create new columns for Close Prices at future dates and calculate Returns.
    nyse = mcal.get_calendar('NYSE')
    
    for day in Desired_Days:
        merged_df[f'Close Price Day {day}'] = np.nan

    for idx, row in merged_df.iterrows():
        trading_days = nyse.valid_days(start_date=row['Date'], end_date=row['Date'] + pd.DateOffset(days=45))

        for day in Desired_Days:
            if day <= len(trading_days):
                data = yf.download(row['Symbol'], start=trading_days[day - 1], end=trading_days[day - 1] + pd.DateOffset(days=1))
                if not data.empty:  
                    merged_df.loc[idx, f'Close Price Day {day}'] = data['Close'][0] 
    
    # Calculate Returns and Highest Day Return
    for day in Desired_Days: 
        merged_df[f'Return ({day} Days)'] = ((merged_df[f'Close Price Day {day}'] - merged_df['Close Price Day 1']) / merged_df['Close Price Day 1']) * 100
        
    merged_df['Highest Day Return'] = merged_df[[f'Return ({day} Days)' for day in Desired_Days]].max(axis=1)
    merged_df['Highest Close Price'] = merged_df[[f'Close Price Day {day}' for day in Desired_Days]].max(axis=1)
    
    for col in merged_df.columns:
        if 'Close Price' in col or 'Return' in col:
            merged_df[col] = merged_df[col].round(2)

    merged_df.dropna(inplace=True)
    merged_df.reset_index(drop=True, inplace=True)
    
    merged_df['Short Squeeze'] = 0
    # Checking if Return (5 Days) and/or Return (7 Days) >= 10
    mask = ((merged_df['Return (5 Days)'] >= 10) | (merged_df['Return (7 Days)'] >= 10))
    merged_df.loc[mask, 'Short Squeeze'] = 1
    # Checking if Return (15 Days) >= 15
    mask = (merged_df['Return (15 Days)'] >= 15)
    merged_df.loc[mask, 'Short Squeeze'] = 1
    # Checking if Return (30 Days) >= 25
    mask = (merged_df['Return (30 Days)'] >= 25)
    merged_df.loc[mask, 'Short Squeeze'] = 1
    # Setting other cases to 0
    merged_df.loc[merged_df['Short Squeeze'] != 1, 'Short Squeeze'] = 0

    return merged_df

In [None]:
# Feature engineering
merged_df = feature_engineering(short_df, insider_df)

In [None]:
# Short Squeeze vs. Non Short Squeeze Counts in DataFrame
short_squeeze_count = merged_df.loc[merged_df['Short Squeeze'] == 1, 'Short Squeeze'].count()
no_short_squeeze_count = merged_df.loc[merged_df['Short Squeeze'] == 0, 'Short Squeeze'].count()

print("Short Squeeze", short_squeeze_count)
print("Non Short Squeeze", no_short_squeeze_count)

In [None]:
merged_df.head()

In [None]:
merged_df.to_csv('Resources/ShortSqueezeData.csv', index=False)

### 5. Data Visualization

In [None]:
# Visualizing Short Squeeze
short_squeeze_df = merged_df[merged_df['Short Squeeze'] == 1]
grouped_df = short_squeeze_df.groupby('Symbol')
for symbol, data in grouped_df:
    data = data.sort_values('Date')
    plt.figure(figsize=(5, 3))
    days_to_plot = Desired_Days
    closing_prices = [data[f'Close Price Day {day}'].iloc[-1] for day in days_to_plot]
    plt.plot(days_to_plot, closing_prices, 'o', color='blue')
    highest_closing_price = max(closing_prices)
    highest_closing_price_day = days_to_plot[closing_prices.index(highest_closing_price)]
    plt.plot(highest_closing_price_day, highest_closing_price, 'ro')
    plt.annotate('Highest Closing Price',
                 xy=(highest_closing_price_day, highest_closing_price),
                 xytext=(highest_closing_price_day + 0.5, highest_closing_price),
                 arrowprops=dict(facecolor='black', arrowstyle='->'),
                 fontsize=8,
                 ha='left')
    plt.xlabel('Day')
    plt.ylabel('Closing Price')
    plt.title(f'Short Squeeze: {symbol}')
    plt.xticks(days_to_plot, [f'Day {day}' for day in days_to_plot])
    plt.grid(True)
    filename = f"Images/{symbol}_close_short_squeeze_plot.png"
    plt.savefig(filename)
    plt.show()

In [None]:
# Looking at each short squeeze company 30 days before and after Insider Trading activity occurs
for symbol, data in grouped_df:
    start_date = pd.to_datetime(data['Date'].min()) - pd.DateOffset(days=30)
    end_date = pd.to_datetime(data['Date'].max()) + pd.DateOffset(days=30)
    yf_data = yf.download(symbol, start=start_date, end=end_date)
    close_prices = yf_data['Close']
    plt.figure(figsize=(4, 3))
    plt.plot(close_prices.index, close_prices.values)
    plt.title(f'{symbol} Short Squeeze')
    plt.xlabel('Date')
    plt.ylabel('Closing Price')
    plt.xticks(rotation=45)  # Rotate the x-axis labels by 45 degrees
    plt.grid(True)
    filename = f"Images/{symbol}_short_squeeze_plot.png"
    plt.savefig(filename)
    plt.show()

### 6. One Hot Encoding

In [None]:
def one_hot_encode(df):
    encoder = OneHotEncoder(sparse=False)
    categorical_variables = ['Company Name', 'Sector', 'Industry']
    encoded_df = pd.DataFrame(encoder.fit_transform(df[categorical_variables]))
    encoded_df.columns = encoder.get_feature_names_out(categorical_variables)
    numerical_df = df.drop(columns=categorical_variables, axis=1)
    return pd.concat([numerical_df, encoded_df], axis=1)

In [None]:
# One hot encode the data
merged_df = one_hot_encode(merged_df)

In [None]:
merged_df.head()

### 7. Train-Test Split

In [None]:
X = merged_df.drop(columns=['Short Squeeze'])
y = merged_df['Short Squeeze']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

### 8. Data Normalization

In [None]:
# Drop the 'Symbol' column from X_train and X_test
X_train_scaled = X_train.drop('Symbol', axis=1)
X_test_scaled = X_test.drop('Symbol', axis=1)

# Drop the 'Date' column from X_train and X_test
X_train_scaled = X_train_scaled.drop('Date', axis=1)
X_test_scaled = X_test_scaled.drop('Date', axis=1)

# Scale the numerical features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_scaled)
X_test_scaled = scaler.transform(X_test_scaled)

### 9. Model Training (FNN)

In [None]:
model = Sequential()
model.add(Dense(64, input_dim=X_train_scaled.shape[1], activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(X_train_scaled, y_train, epochs=50, batch_size=32, verbose=2)

### 10. Model Evaluation

In [None]:
loss, accuracy = model.evaluate(X_test_scaled, y_test, verbose=0)
print(f'Test Accuracy: {accuracy*100:.2f}%')

### 11. Model Predictions

In [None]:
predictions = model.predict(X_test_scaled)
# Since this is a binary classification problem, we'll set a threshold at 0.5
predictions = [1 if pred > 0.5 else 0 for pred in predictions]

In [None]:
# You might want to compare these predictions with the actual values
comparison_df = pd.DataFrame({'Actual': y_test, 'Predicted': predictions})

# Print out a sample of the comparison DataFrame
print(comparison_df.sample(10))

### 12. Performance Metrics

In [None]:
print(classification_report(y_test, predictions))
print(confusion_matrix(y_test, predictions))

In [None]:
# Save the model
model.save('fnn_model')

# Load the model later
from tensorflow.keras.models import load_model
loaded_model = load_model('fnn_model')

## Alternative Models

### Random Forest vs SMOTE

In [None]:
# Count the distinct values in the original labels data
y_train.value_counts()

In [None]:
# Create a random forest classifier
rf_model = RandomForestClassifier(n_estimators=100, random_state=1)

# Fitting the model
rf_model = rf_model.fit(X_train_scaled, y_train)

# Making predictions using the testing data
rf_predictions = rf_model.predict(X_test_scaled)

In [None]:
# Print out a sample of the comparison DataFrame
rf_comparison_df = pd.DataFrame({'Actual': y_test, 'Predicted': rf_predictions})
print("Random Forest Results Sample")
print(rf_comparison_df.sample(15))

In [None]:
# Print performance metrics
print(classification_report(y_test, rf_predictions))
print(confusion_matrix(y_test, rf_predictions))

In [None]:
# Save the model
dump(rf_model, 'rf_model.joblib')

In [None]:
# Load the model later
loaded_rf_model = load('rf_model.joblib')

### SMOTE

In [None]:
# Instantiate SMOTE
smote_sampler = SMOTE(random_state=1, sampling_strategy='minority')

# Fit the SMOTE model to the data
X_resampled, y_resampled = smote_sampler.fit_resample(X_train_scaled, y_train)

In [None]:
# Count the distinct values in the resampled labels data
print(f"SMOTE distribution: {y_resampled.value_counts()}")

In [None]:
# Train the classifier
smote_model = RandomForestClassifier(n_estimators=100, random_state=1).fit(X_resampled, y_resampled)

# Making predictions using the testing data
smote_predictions = smote_model.predict(X_test_scaled)

In [None]:
# Print out a sample of the comparison DataFrame
smote_comparison_df = pd.DataFrame({'Actual': y_test, 'Predicted': smote_predictions})
print("SMOTE Results Sample")
print(smote_comparison_df.sample(10))

In [None]:
# Print performance metrics
print(classification_report(y_test, smote_predictions))
print(confusion_matrix(y_test, smote_predictions))

In [None]:
# Save the model
dump(smote_model, 'smote_model.joblib')

In [None]:
# Load the model later
loaded_smote_model = load('smote_model.joblib')