<a href="https://colab.research.google.com/github/thakuraman1011/main/blob/main/colab_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [14]:
# @title Set environment variables
###########################################################################
_SOURCE_LOCATION = 'company_facts'
_MODIFIED_FACTS = 'modified_facts'
_SCRATCH_FOLDER = 'scratch_folder'
_COMPANY_FACTS='company_facts'

_DATE_THRESHOLD = '2017-12-31'
_MIN_KEYS_IN_MODIFIED_JSON = 2


_URL = 'https://www.sec.gov/Archives/edgar/daily-index/xbrl/companyfacts.zip'
_EMAIL_ADDRESS = "aman.thakur@gmail.com"
_EMAIL_PASSWORD = "flkw mndp bbqe krcc"

_TEMP_ZIP_FILE = 'company_facts.zip'
###########################################################################

In [None]:
# @title Clear existing folders
import shutil
import os

try:
  folder_name = _COMPANY_FACTS
  if os.path.exists(folder_name):
    shutil.rmtree(folder_name)
    print(f"Deleted existing folder: {folder_name}")

  folder_name = _MODIFIED_FACTS
  if os.path.exists(folder_name):
    shutil.rmtree(folder_name)
    print(f"Deleted existing folder: {folder_name}")

  folder_name=_SCRATCH_FOLDER
  if os.path.exists(folder_name):
    shutil.rmtree(folder_name)
    print(f"Deleted existing folder: {folder_name}")
except Exception as e:
  print(f"Error deleting folders....{e}")


In [None]:
# @title Download and extract
import os
import zipfile
import glob
import urllib.request
from pathlib import Path
from google import colab

def process_sec_data(url=_URL, zip_filename=_TEMP_ZIP_FILE, folder_name=_COMPANY_FACTS):
    """Download and extract company facts from SEC EDGAR.
    This function performs the following steps:
    1. Deletes the target folder if it already exists to ensure a clean state.
    2. Creates a target folder to store the extracted files.
    3. Downloads the ZIP file containing company facts from the specified URL.
    4. Extracts the contents of the ZIP file into the target folder.
    5. Logs the number of JSON files extracted for verification.
    """
    try:
      if not os.path.exists(folder_name):
        os.makedirs(folder_name)
        print(f"Created folder: {folder_name}")

        # 2. Download the zip file
        print(f"Downloading from {url}...")
        headers = {'User-Agent': 'amanthakur@gmail.com'}
        request = urllib.request.Request(url, headers=headers)
        response = urllib.request.urlopen(request)
        if response.getcode() != 200:
            raise ValueError(f"Failed to download from URL: {url}")


        with open(zip_filename, 'wb') as f:
            while True:
                chunk = response.read(8192)
                if not chunk:
                    break
                f.write(chunk)

        # 3. Extract the contents
        print(f"Extracting to {folder_name}...")
        with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
            zip_ref.extractall(folder_name)

        dir_path = Path(folder_name)
        file_count = len(list(dir_path.glob('*.json')))
        print(f"Extracted {file_count} files.")

    except Exception as e:
        print(f"An error occurred: {e}")

process_sec_data()
print('download from sec and extraction completed')


In [None]:
# @title Modify and rewrite
import os
import glob
from pathlib import Path
from datetime import date
import json

def _transform_instance(json_data):
  """
  Transforms SEC json file structure to a more flat structure and also filters facts based on end date and form type.
  Transformed structure is {cik,entityName,element[]}
  element[] = {end,val,form,accn,fp,filed,unit}
  """
  new_json = {}
  new_json['cik'] = str(json_data['cik']).zfill(10)
  new_json['entityName'] = json_data['entityName']

  elements = json_data.get('facts', {}).get('ifrs-full')
  if not elements:
    elements = json_data.get('facts', {}).get('us-gaap')
    if not elements:
      elements = {}

  for element_key, element_value in elements.items():
    units = element_value.get('units', {})
    #assumption that units dictionary has only one item
    key,facts = next(iter(units.items()))
    # now key is EUR/USD/SHARES and facts is a list
    fact_list = []
    dup_avoiding_dictionary ={}
    for fact in facts:
      if dup_avoiding_dictionary.get(fact['end']):
        continue
      else:
        fact_dictionary ={}
        if date.fromisoformat(fact['end']) > date.fromisoformat(_DATE_THRESHOLD) and (fact['form'] in ['10-K','10-Q','20-F','6-K']):
          fact_dictionary ['end'] = fact['end']
          fact_dictionary ['val'] = fact['val']
          fact_dictionary ['form'] = fact['form']
          fact_dictionary ['accn'] = fact['accn']
          fact_dictionary ['fp'] = fact['fp']
          fact_dictionary ['filed'] = fact['filed']
          fact_dictionary ['unit'] = key
          fact_list.append(fact_dictionary)
          dup_avoiding_dictionary[fact['end']] = True
    # Add element only if there was at least one fact.
    if len(fact_list) > 0:
        new_json[element_key] = fact_list

  # if the only facts are CIK and entityName then return None
  if len(new_json) > _MIN_KEYS_IN_MODIFIED_JSON:
    return new_json
  else:
    return None

def transform(file_name = None, source_dir=_COMPANY_FACTS, dest_dir=_MODIFIED_FACTS):
  """
  iterates over all json files in source_dir
  1. Read file
  2. Load to json
  3. Transform it
  4. Write modify json to dest_dir
  Files with no facts after THRESHOLD_DATE are skipped as are files which result in errors
  """
  src = Path(source_dir)
  dest = Path(dest_dir)
  # Create destination directory if it doesn't exist
  dest.mkdir(parents=True, exist_ok=True)
  json_files = None

  if file_name:
    json_files = list(src.glob(file_name))
  else:
    json_files = src.glob("*.json")

  # 3. Iterate and transform
  for file_path in json_files:
    try:
      # Read the JSON file
      with open(file_path, 'r') as f:
        json_data = json.load(f)
        new_json_data = _transform_instance(json_data)
        if new_json_data:
          new_file_path = dest / file_path.name
          # Write the modified dictionary to the new location
          with open(new_file_path, 'w') as f:
              json.dump(new_json_data, f)
        else:
          print(f"skipping original json {file_path.name}")
    except Exception as e:
        print(f"Error processing {file_path.name}: {e}")


########################## Execution block ################################
transform()
print('modified files created')


In [None]:
# @title Download zip files
!zip -r modified_facts.zip "modified_facts"

from google.colab import files
files.download('modified_facts.zip')
files.download(_TEMP_ZIP_FILE)




In [None]:
# @title Delete files and folders no longer needed
import shutil
import os
from pathlib import Path

try:
  folder_name = _COMPANY_FACTS
  if os.path.exists(folder_name):
    shutil.rmtree(folder_name)
    print(f"Deleted existing folder: {folder_name}")

  file_path = Path(_TEMP_ZIP_FILE)
  if file_path.exists():
    file_path.unlink()
    print(f"Deleted file: {file_path}")

  file_path = Path(f"{_COMPANY_FACTS}.zip")
  if file_path.exists():
    file_path.unlink()
    print(f"Deleted file: {file_path}")

  file_path = Path(f"{_MODIFIED_FACTS}.zip")
  if file_path.exists():
    file_path.unlink()
    print(f"Deleted file: {file_path}")

except Exception as e:
  print(f"Error deleting folders....{e}")

Error deleting folders....name '_TEMP_ZIP_FILE' is not defined


In [20]:
# @title Debugging
import json
import os
from collections import defaultdict
from pathlib import Path
import random

def get_cik(json_data):
  if json_data:
    return json_data.get('cik')
  else:
    return ""


def get_json_for_file(f):
  try:
    json_data = json.load(f)
    if not json_data:
      print(f"Error reading filename {f}")
    return json_data
  except Exception as e:
    print(f"Error reading filename {f}")
    print(f"Error: {e}")
    return None

def has_all_elements(element_names,json_data):
  names = set(element_names)
  keys_set = set(json_data)
  if names.issubset(keys_set):
    return True
  else:
    return False

def has_element(element_name,json_data):
  return has_all_elements([element_name],json_data)

def _lists_have_same_elements(list1,list2):
  return set(list1) == set(list2)

def get_json_files():
  src = Path(_MODIFIED_FACTS)
  json_files = src.glob("*.json")
  return json_files

def get_ciks_with_element (element_name, break_at=100):
  ciks = []
  for file_path in get_json_files():
    if len(ciks) >= break_at:
      break
    with open(file_path, 'r') as f:
      json_data = get_json_for_file(f)
      if has_element(element_name,json_data):
        ciks.append({'cik':json_data['cik'],'entityName':json_data['entityName']})
  return ciks

def get_ciks_with_all_elements(element_names, break_at=100):
  ciks = []
  for file_path in get_json_files():
    if len(ciks) >= break_at:
      break
    with open(file_path, 'r') as f:
      json_data = get_json_for_file(f)
      if has_all_elements(element_names,json_data):
        ciks.append({'cik':json_data['cik'],'entityName':json_data['entityName']})
  return ciks


def get_ciks_without_element (element_name, break_at=100):
  ciks = []
  for file_path in get_json_files():
    if len(ciks) >= break_at:
      break
    with open(file_path, 'r') as f:
      json_data = get_json_for_file(f)
      if not has_element(element_name,json_data):
        ciks.append({'cik':json_data['cik'],'entityName':json_data['entityName']})
  return ciks

def has_all_elements_with_same_period (element_names,json_data):
  if has_all_elements(element_names,json_data):
    records =[]
    #iterate over each element_name
    for element_name in element_names:
      #get the facts for the element
      facts = json_data[element_name]
      for fact in facts:
        record = {'name':element_name,'date': fact['end']}
        records.append(record)
        # records is a list of type {element_name,end_date,form}
        # it has all element_names, end_date and form

      dict ={}
      # when for loop is executed, the dict will be keyed by date and will have
      # all element names which had a fact with this date
      #iterate over each record
      for record in records:
        # check if dict{} already has key associated with this date
        if not dict.get(record['date']):
          #if not found then add they key and value =new list with one entry element_name
          dict[record['date']] = [record['name']]
        else:
          # if found then append the element_name to the list
          dict[record['date']].append(record['name'])
      # now dict{} has keys corresponding to all dates found in facts
      # and associated value is element_names
      for key,value in dict.items():
        # if any of the lists matches element_names passed as parameter then return period, type and true
        if _lists_have_same_elements(element_names,value):
          return key,True
  return None, False

def get_ciks_has_one_not_the_other(has,has_not, break_at=100):
  ciks = []
  for file_path in get_json_files():
    if len(ciks) >= break_at:
      break
    with open(file_path, 'r') as f:
      json_data = get_json_for_file(f)
      if has_element(has,json_data) and (not has_element(has_not,json_data)):
        ciks.append({'cik':json_data['cik'],'entityName':json_data['entityName']})
  return ciks


def get_ciks_with_all_elements_for_same_period(element_names, break_at=100):
  ciks = []
  for file_path in get_json_files():
    if len(ciks) >= break_at:
      break
    with open(file_path, 'r') as f:
      json_data = get_json_for_file(f)
      date,found = has_all_elements_with_same_period(element_name,json_data)
      if found:
        ciks.append({'cik':json_data['cik'],'entityName': json_data['entityName']})
  return ciks

def call_function(parameter1, parameter2,func):
  if parameter1 and parameter2:
    return func(parameter1,parameter2)

  if parameter1 and parameter2 == None:
    return func(parameter1)

  if parameter2 and parameter1 == None:
    return func(parameter2)

  if parameter1 == None and parameter2 == None:
    return func()

def print_per_line(param):
  if isinstance(param, list):
    for item in param:
      print(item)
  else:
    print(param)

element_names =['CashAndCashEquivalentsAtCarryingValue','CashCashEquivalentsRestrictedCashAndRestrictedCashEquivalents'] #'CashAndCashEquivalentsAtCarryingValue',
element_name = 'AmortizationOfIntangibleAssets'

# get_ciks_with_element
# get_ciks_with_all_elements
# get_ciks_without_element
# has_all_elements_with_same_period
# get_ciks_has_one_not_the_other
# get_ciks_with_all_elements_for_same_period

ciks = None
ciks = call_function(element_names, None, get_ciks_with_all_elements_for_same_period)
print_per_line(ciks)



In [None]:
# @title Generate Element frequency CSV
import os
import glob
import json
from collections import Counter
from pathlib import Path
import csv

def write_element_frequency():
  SKIP_KEYS = {"cik", "entityName"}
  element_counts = Counter()
  total_files = 0
  source_path = Path(_MODIFIED_FACTS)
  for path in source_path.glob('*.json'):
      data= None
      with open(path) as f:
          data = json.load(f)
      total_files += 1
      for key in data:
          if key not in SKIP_KEYS:
              element_counts[key] += 1
  csv_file = "element_frequency.csv"
  with open(csv_file, 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(["Element", "Frequency"])
    for element, count in element_counts.items():
      writer.writerow([element, count])

write_element_frequency()
print('element frequency csv created')



element frequency csv created


In [None]:
# @title Scratchpad
from pathlib import Path
import os
import json
import shutil


def move_from_company_to_scratch(file_name):
  src_path = Path(f"{_COMPANY_FACTS}/{file_name}")
  if not os.path.exists('scratch_folder'):
    os.makedirs('scratch_folder')
  shutil.copy(src_path, Path("scratch_folder"))




SCRATCH_FILE = 'CIK0001333141.json'
scratch_file = Path(f"{_COMPANY_FACTS}/{SCRATCH_FILE}")
src_folder = Path(_COMPANY_FACTS)
dest_folder = Path(_MODIFIED_FACTS)

############## To print size of folders ##############
source_count = sum(1 for _ in src_folder.glob('*.json'))
dest_count = sum(1 for _ in dest_folder.glob('*.json'))

print(f"Source folder has {source_count} files.")
print(f"Destination folder has {dest_count} files.")

####### to dump contents of SCRATCH FILE #############
if os.path.exists('scratch_folder'):
  shutil.rmtree('scratch_folder')
  os.makedirs('scratch_folder')

with open(f'{_MODIFIED_FACTS}/{SCRATCH_FILE}', 'r') as f:
  print(f"Contents of {SCRATCH_FILE}:")
  print(f"reading....{SCRATCH_FILE} from {_COMPANY_FACTS}")
  json_data = json.load(f)
with open(f'scratch_folder/{SCRATCH_FILE}','w') as f:
  print(f"writing....{SCRATCH_FILE}/{'scratch_folder'}")
  json.dump(json_data, f)
######################################################



###########################################################################
def send_notification(final_dictionary):
  print('writing CSV file')
  csv_file_name = f"{_CSV_FILENAME_PREFIX}{_DATE_THRESHOLD}.csv"
  with open(csv_file_name, "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(['Element','Frequency']) # header row
    for key,value in final_dictionary.items():
      writer.writerow([key, value])
  msg = EmailMessage()
  msg['Subject'] = 'colab notification'
  msg['From'] = _EMAIL_ADDRESS
  msg['To'] = _EMAIL_ADDRESS

  with open(csv_file_name, 'rb') as f:
    file_data = f.read()
    file_name = f.name
  msg.add_attachment(file_data, maintype='application', subtype='octet-stream', filename=file_name)
  try:
    with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp:
      smtp.login(_EMAIL_ADDRESS, _EMAIL_PASSWORD)
      smtp.send_message(msg)
    print('email sent')
  except Exception as e:
    print(f"Error: {e}")

"""

In [2]:
# @title Upload and unzip to modified facts folder from local
import os
import zipfile
from google.colab import files

def upload_and_unzip(target_folder=_MODIFIED_FACTS):
    # Create target folder if it doesn't exist
    if not os.path.exists(target_folder):
        os.makedirs(target_folder)
        print(f"Created folder: {target_folder}")

    print("Please select the zip file to upload:")
    uploaded = files.upload()

    if not uploaded:
        print("No file was uploaded.")
        return

    # Assuming only one file is uploaded at a time
    zip_filename = next(iter(uploaded))
    print(f"Uploaded file: {zip_filename}")

    try:
        with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
            zip_ref.extractall(target_folder)
        print(f"Successfully extracted '{zip_filename}' to '{target_folder}'.")
    except zipfile.BadZipFile:
        print(f"Error: '{zip_filename}' is not a valid zip file.")
    except Exception as e:
        print(f"An error occurred during extraction: {e}")
    finally:
        # Clean up the uploaded zip file from the Colab environment
        os.remove(zip_filename)
        print(f"Deleted uploaded zip file: {zip_filename}")

# Call the function to initiate the process
upload_and_unzip()

Created folder: modified_facts
Please select the zip file to upload:


Saving modified_facts.zip to modified_facts.zip
Uploaded file: modified_facts.zip
Successfully extracted 'modified_facts.zip' to 'modified_facts'.
Deleted uploaded zip file: modified_facts.zip
