In [25]:
#################################################################
import magic  # pip install python-magic-bin
from zipfile import ZipFile, BadZipFile
from bs4 import BeautifulSoup
import pandas as pd
import requests
import shutil
import time
import os
import re

# Set folders for storing data
BASE_FOLDER = 'sec_13f_data'
ANNUAL_FOLDER = 'sec_13f_annual'
os.makedirs(BASE_FOLDER, exist_ok=True)
os.makedirs(ANNUAL_FOLDER, exist_ok=True)

# Step 1: Get SEC ZIP Links
def get_zip_links(start_year, end_year):
    url = "https://www.sec.gov/data-research/sec-markets-data/form-13f-data-sets"
    headers = {
        "User-Agent": "MyAppName/1.0 (myemail@example.com)",
        "Accept-Language": "en-US,en;q=0.9",
        "Referer": "https://www.sec.gov/",
    }
    try:
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, 'html.parser')
        zip_links = []
        
        for year in range(start_year, end_year + 1):
            for link in soup.find_all('a', href=True):
                href = link['href']
                if year >= 2024:
                    if href.endswith('.zip') and re.search(rf"{year}", href):
                        zip_links.append(f"https://www.sec.gov{href}")
                else:
                    if href.endswith('.zip') and f"{year}q" in href:
                        zip_links.append(f"https://www.sec.gov{href}")
        
        if not zip_links:
            raise Exception("[ERROR] No matching ZIP files found.")
        print(f"[INFO] Found {len(zip_links)} ZIP files and start to download and process.")
        return zip_links
    
    except requests.RequestException as e:
        raise Exception(f"[ERROR] Failed to fetch ZIP links: {e}")

# Step 2: Download ZIP Files
def is_valid_zip(file_path):
    mime_type = magic.Magic(mime=True).from_file(file_path)
    return mime_type == 'application/zip'

def download_zip(zip_url, output_folder):
    os.makedirs(output_folder, exist_ok=True)
    local_zip = os.path.join(output_folder, os.path.basename(zip_url))
    
    headers = {
        "User-Agent": "MyAppName/1.0 (myemail@example.com)"
    }
    response = requests.get(zip_url, headers=headers, stream=True)
    response.raise_for_status()
    
    with open(local_zip, 'wb') as f:
        for chunk in response.iter_content(chunk_size=1024):
            if chunk:
                f.write(chunk)
    
    if not is_valid_zip(local_zip):
        os.remove(local_zip)
        raise Exception("[ERROR] Downloaded file is not a valid ZIP.")
    
    time.sleep(2)
    return local_zip

# Step 3: Extract ZIP Files
def extract_zip(zip_path, extract_to):
    try:
        with ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_to)
    except BadZipFile:
        raise Exception("[ERROR] Invalid ZIP file.")

# Step 4: Process and Parsing TSV and convert to CSV
def process_tsv_files(folder, output_csv):
    infotable_df = None
    
    # Load INFOTABLE as the base
    for file in os.listdir(folder):
        if file.endswith('.tsv'):
            tsv_path = os.path.join(folder, file)
            try:
                df = pd.read_csv(tsv_path, sep='\t', encoding='utf-8', engine='python')
                if 'INFOTABLE' in file:
                    infotable_df = df.copy()
            except Exception as e:
                print(f"[ERROR] Failed to process {file}: {e}")
    
    if infotable_df is None:
        raise ValueError("[ERROR] No INFOTABLE file found.")
    
    # Extract CIK
    if 'ACCESSION_NUMBER' in infotable_df.columns:
        infotable_df['CIK'] = infotable_df['ACCESSION_NUMBER'].str.split('-').str[0]
    else:
        raise ValueError("[ERROR] ACCESSION_NUMBER column is missing.")
    
    # Merge Other TSV Files into INFOTABLE
    for file in os.listdir(folder):
        if file.endswith('.tsv') and 'INFOTABLE' not in file:
            tsv_path = os.path.join(folder, file)
            try:
                df = pd.read_csv(tsv_path, sep='\t', encoding='utf-8', engine='python')
                if 'ACCESSION_NUMBER' in df.columns:
                    common_columns = set(infotable_df.columns).intersection(df.columns)
                    df = df.drop(columns=[col for col in common_columns if col != 'ACCESSION_NUMBER'])
                    infotable_df = pd.merge(
                        infotable_df, df, on='ACCESSION_NUMBER', how='left'
                    )
            except Exception as e:
                print(f"[ERROR] Failed to merge {file}: {e}")
    
    # Remove Duplicate Rows Based on Critical Subset
    subset_columns = [
        'CIK', 'FILINGMANAGER_NAME', 'SUBMISSIONTYPE', 'PERIODOFREPORT', 'FILING_DATE',
        'NAMEOFISSUER', 'TITLEOFCLASS', 'CUSIP', 'VALUE', 'SSHPRNAMT'
    ]
    subset_columns = [col for col in subset_columns if col in infotable_df.columns]
    before_dedup = len(infotable_df)
    infotable_df.drop_duplicates(subset=subset_columns, inplace=True)
    after_dedup = len(infotable_df)
    
    # Final Column Mapping
    columns_to_keep = {
        'CIK': 'cik',
        'FILINGMANAGER_NAME': 'coname',
        'SUBMISSIONTYPE': 'form',
        'PERIODOFREPORT': 'rdate',
        'FILING_DATE': 'fdate',
        'NAMEOFISSUER': 'nameOfIssuer',
        'TITLEOFCLASS': 'titleOfClass',
        'CUSIP': 'cusip',
        'VALUE': 'value',
        'SSHPRNAMT': 'sshPrnamt',
        'SSHPRNAMTTYPE': 'sshPrnamtType',
        'PUTCALL': 'putCall',
        'INVESTMENTDISCRETION': 'investmentDiscretion',
        'OTHERMANAGER': 'otherManager',
        'VOTING_AUTH_SOLE': 'Sole',
        'VOTING_AUTH_SHARED': 'Shared',
        'VOTING_AUTH_NONE': 'None'
    }
    
    # Create final_df with selected columns
    final_df = infotable_df[[col for col in columns_to_keep.keys() if col in infotable_df.columns]].copy()
    final_df.rename(columns=columns_to_keep, inplace=True)
    
    # Save the final CSV
    final_df.to_csv(output_csv, index=False)
    print(f"[INFO] Saved 13F CSV: {output_csv}")
    
    # Post-process date columns in the saved CSV
    postprocess_csv_dates(output_csv)
    
    # Cleanup temporary files
    shutil.rmtree(folder)

# Step 5: Main Workflow
def main():
    start_year = int(input("Enter start year: "))
    end_year = int(input("Enter end year: "))
    zip_links = get_zip_links(start_year, end_year)
    
    # Ensure the user enters a valid range
    if start_year < 2013 or end_year > 2030 or start_year > end_year:
        print("[ERROR] Invalid year range. Please enter a valid range between 2013 and 2030.")
        return
    
    for zip_link in zip_links:
        zip_folder = os.path.join(BASE_FOLDER, os.path.basename(zip_link).replace('.zip', ''))
        zip_path = download_zip(zip_link, zip_folder)
        extract_zip(zip_path, zip_folder)
        process_tsv_files(zip_folder, os.path.join(BASE_FOLDER, f"{os.path.basename(zip_link).replace('.zip', '')}.csv"))
    
    # Perform annual aggregation after processing all CSVs
    aggregate_csv_by_year()
    # Sort every csv file by rdate and cik
    sort_csv_files()

# Step 6: Post-Process Dates in Final CSV
def postprocess_csv_dates(csv_file):
    """
    Ensures date columns (rdate and fdate) are formatted as YYYY-MM-DD after CSV export.
    """
    try:
        # Explicitly define column types to handle mixed data types
        dtype_mapping = {
            'rdate': 'str',
            'fdate': 'str'
        }
        
        df = pd.read_csv(csv_file, dtype=dtype_mapping, low_memory=False)
        
        # Explicitly format date columns
        for date_col in ['rdate', 'fdate']:
            if date_col in df.columns:
                df[date_col] = pd.to_datetime(
                    df[date_col], format='%d-%b-%Y', errors='coerce'
                ).dt.strftime('%Y-%m-%d')
        
        # Save the CSV again with updated date formats
        df.to_csv(csv_file, index=False)    
    except Exception as e:
        print(f"[ERROR] Failed to format dates in final CSV: {e}")

# Step 7: Aggregate Final CSVs by Year Based on fdate
def aggregate_csv_by_year():
    """
    Aggregate all final CSV files by year based on the 'fdate' column.
    """
    try:
        all_files = [os.path.join(BASE_FOLDER, f) for f in os.listdir(BASE_FOLDER) if f.endswith('.csv')]
        annual_data = {}
        
        for file in all_files:
            df = pd.read_csv(file, low_memory=False)
            
            # Ensure 'fdate' is in the correct datetime format
            if 'fdate' in df.columns:
                df['fdate'] = pd.to_datetime(df['fdate'], errors='coerce')
                df['year'] = df['fdate'].dt.year  # Extract year from fdate
                
                # Group data by year
                for year, group in df.groupby('year'):
                    if year is not pd.NaT and not pd.isna(year):
                        if year not in annual_data:
                            annual_data[year] = group.copy()
                        else:
                            annual_data[year] = pd.concat([annual_data[year], group], ignore_index=True)
        
        # Save annual aggregated data
        for year, df in annual_data.items():
            output_file = os.path.join(ANNUAL_FOLDER, f"SEC_13F_Holdings_in_fdate_{year}.csv")
            df.drop(columns=['year'], inplace=True)
            df.to_csv(output_file, index=False)
            print(f"[INFO] Saved annual aggregated CSV for filling year {year}: {output_file}")
    
    except Exception as e:
        print(f"[ERROR] Failed during annual aggregation: {e}")

# Step 8: Sort every csv file by rdate and cik in annual folder and base folder
def sort_csv_files():
    csv_files = [f for f in os.listdir(BASE_FOLDER) if f.endswith('.csv')]
    for file in csv_files:
        df = pd.read_csv(os.path.join(BASE_FOLDER, file), low_memory=False)
        df.sort_values(by=['rdate', 'cik'], inplace=True)
        df.to_csv(os.path.join(BASE_FOLDER, file), index=False)
    
    csv_files = [f for f in os.listdir(ANNUAL_FOLDER) if f.endswith('.csv')]
    for file in csv_files:
        df = pd.read_csv(os.path.join(ANNUAL_FOLDER, file), low_memory=False)
        df.sort_values(by=['rdate', 'cik'], inplace=True)
        df.to_csv(os.path.join(ANNUAL_FOLDER, file), index=False)

# Run the main workflow
if __name__ == "__main__":
    main()


[INFO] Found 4 ZIP files and start to download and process.
[ERROR] Invalid year range. Please enter a valid range between 2013 and 2030.


In [23]:
import pandas as pd
import warnings
import glob
import os
warnings.simplefilter(action='ignore', category=pd.errors.DtypeWarning)

# Combine all CSV files in the annual folder
print("[INFO] Combining all annual CSV files")
all_files = glob.glob('sec_13f_annual/*.csv')
df_list = [pd.read_csv(f, low_memory=False) for f in all_files]
combined_df = pd.concat(df_list, ignore_index=True)
sorted_df = combined_df.sort_values(by='rdate')
sorted_df.to_csv('Combined_sorted_13F.csv', index=False)
# Filter 13F fillings and save as compressed file
input_df = pd.read_csv('Combined_sorted_13F.csv')
filtered_df = input_df[(input_df['rdate'] >= '2013-03-31') & (input_df['rdate'] <= '2023-12-31')]
filtered_df.to_csv('13F_fillings.csv.gz', compression='gzip', index=False)
os.rename('13F_fillings.csv.gz', '13F_fillings.gz')

print("[INFO] Filtered 13F fillings saved as CSV.")


[INFO] Combining all annual CSV files
[INFO] Filtered 13F fillings saved as CSV.
