In [15]:

#Mount Google drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [16]:
import os
import pandas as pd
import requests
import time
from tqdm import tqdm

from bs4 import BeautifulSoup
import nltk
import re
import string

In [17]:
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [18]:
BASE_URL = "https://www.sec.gov/Archives/"

# SEC requires this - read the "Fair access" section at https://www.sec.gov/os/accessing-edgar-data
# The name and email can be dummy
USER_AGENT = "Nisha nisha@gmail.com" 
headers = {"User-Agent": USER_AGENT}

DOWNLOAD_DELAY = 0.2 # SEC allows max 10 requests per second, hence this should be greater than 1/10 = 0.1s

DATA_DIR = "drive/MyDrive/Blackcoffer" # Path to directory containing excel files

CIK_DATA_PATH = os.path.join(DATA_DIR,"cik_list.xlsx")

SEC_DATA_DIR = os.path.join(DATA_DIR,"sec_data") # Where to download txt files

OUTPUT_FILE_PATH = os.path.join(DATA_DIR,"output.xlsx")


MASTER_DICTIONARY_PATH = os.path.join(DATA_DIR,"LoughranMcDonald_MasterDictionary_2020.xlsx")

STOPWORDS_GENERIC_PATH      = os.path.join(DATA_DIR,"StopWords_Generic.txt")
STOPWORDS_GENERIC_LONG_PATH = os.path.join(DATA_DIR,"StopWords_GenericLong.txt")

CONSTRAINING_DICTIONARY_PATH = os.path.join(DATA_DIR,"constraining_dictionary.xlsx")
UNCERTAINITY_DICTIONARY_PATH = os.path.join(DATA_DIR,"uncertainty_dictionary.xlsx")

In [19]:
master_dictionary = pd.read_excel(MASTER_DICTIONARY_PATH)

In [24]:
master_dictionary.sample(3)

Unnamed: 0,Word,Seq_num,Word Count,Word Proportion,Average Proportion,Std Dev,Doc Count,Negative,Positive,Uncertainty,Litigious,Strong_Modal,Weak_Modal,Constraining,Complexity,Syllables,Source
8940,BROKENNESSES,8941,0,0.0,0.0,0.0,0,0,0,0,0,0,0,0,0,4,12of12inf
52792,OVERPARTICULAR,52796,0,0.0,0.0,0.0,0,0,0,0,0,0,0,0,0,6,12of12inf
14583,CONCEITEDLY,14584,0,0.0,0.0,0.0,0,0,0,0,0,0,0,0,0,4,12of12inf


In [25]:
positive_words = master_dictionary[master_dictionary["Positive"]>0]["Word"]
positive_words = positive_words.apply(lambda x:str(x).lower()).tolist()
positive_words = set(positive_words) # Sets are better than lists for searching

 
negative_words = master_dictionary[master_dictionary["Negative"]>0]["Word"]
negative_words = negative_words.apply(lambda x:str(x).lower()).tolist()
negative_words = set(negative_words)

In [26]:
constraining_words = set(pd.read_excel(CONSTRAINING_DICTIONARY_PATH)["Word"].tolist())
uncertainity_words = set(pd.read_excel(UNCERTAINITY_DICTIONARY_PATH)["Word"].tolist())

In [27]:
with open(STOPWORDS_GENERIC_PATH,'r') as f:
  generic_stopwords = set(f.readlines())

with open(STOPWORDS_GENERIC_LONG_PATH,'r') as f:
  long_stopwords = set(f.readlines())

nltk_stopwords = set(nltk.corpus.stopwords.words('english'))

all_stopwords = nltk_stopwords | generic_stopwords | long_stopwords # Take union of all sets

In [28]:
print(f"No. of positive words - {len(positive_words)}")

print(f"No. of negative words - {len(negative_words)}")

print(f"No. of generic stopwords - {len(generic_stopwords)}")

print(f"No. of long stopwords - {len(long_stopwords)}")

print(f"No. of nltk stopwords - {len(nltk_stopwords)}")

print(f"No. of all stopwords - {len(all_stopwords)}")

No. of positive words - 347
No. of negative words - 2345
No. of generic stopwords - 121
No. of long stopwords - 570
No. of nltk stopwords - 179
No. of all stopwords - 870


In [29]:
cik_data = pd.read_excel(CIK_DATA_PATH)

In [30]:
cik_data.sample(3)

Unnamed: 0,CIK,CONAME,FYRMO,FDATE,FORM,SECFNAME
25,3982,ALLIS CHALMERS ENERGY INC.,200611,2006-11-08,10-Q,edgar/data/3982/0000950129-06-009522.txt
11,3662,SUNBEAM CORP/FL/,199906,1999-06-11,10-Q,edgar/data/3662/0000950170-99-001005.txt
68,5588,AMERICAN PAD & PAPER CO,199708,1997-08-14,10-Q,edgar/data/5588/0000950134-97-006223.txt


In [31]:
cik_data.sample(3)

Unnamed: 0,CIK,CONAME,FYRMO,FDATE,FORM,SECFNAME
53,4515,AMERICAN AIRLINES INC,200810,2008-10-16,10-Q,edgar/data/4515/0000004515-08-000073.txt
80,5907,AT&T CORP,200011,2000-11-14,10-Q,edgar/data/5907/0000005907-00-000038.txt
7,3662,SUNBEAM CORP/FL/,199812,1998-12-22,10-Q,edgar/data/3662/0000950170-98-002402.txt


In [32]:
download_links = cik_data["SECFNAME"].apply(lambda link: BASE_URL+link)

In [33]:
print(f"No. of files to download - {len(download_links)}")

No. of files to download - 152


In [34]:
def download_sec_data(links_to_download,download_dir):
  """
  Downloads SEC data.
  Stops on the first failure


  Params:
    links_to_download([str]) - List of links to download
    download_dir(str) - Directory where to download files

  Returns:
    status(int) - 0(success), 1(failure)
  """
  if not os.path.exists(download_dir):
    os.mkdir(download_dir)
    print(f"Created directory {download_dir}")

  try:
    with requests.Session() as session:
      for i,link in enumerate(tqdm(links_to_download)):
        response = session.get(link,headers=headers)

        FILE_NAME = os.path.join(download_dir,f"{i}.txt")

        with open(FILE_NAME,"w") as f:
          f.write(response.text)
        
        time.sleep(DOWNLOAD_DELAY) # Add delay in between requests so SEC doesn't block our IP
  except Exception as e:
    print(f"Download failed")
    print(e)
    return 1

  print("Download successful")

  return 0

In [35]:
%%time
#download_sec_data(download_links,SEC_DATA_DIR)

  0%|          | 0/152 [00:00<?, ?it/s]

Created directory drive/MyDrive/Blackcoffer/sec_data


100%|██████████| 152/152 [00:43<00:00,  3.47it/s]

Download successful
CPU times: user 1.42 s, sys: 265 ms, total: 1.68 s
Wall time: 43.8 s





0

### Calculating metrics

In [36]:
def get_num_of_syllables(word):
  word = word.lower()
  vowels = ['a','e','i','o','u']

  syllables = 0

  for char in word:
    if char in vowels:
      syllables += 1

  return syllables

def get_scores(words):
  """
  Calculate various scores such as positive_score,constraining_score

  Params:
    words([str]) - List of words

  Returns:
    result((int)) - (positive_score,negative_score,
          uncertainty_score,constraining_score,complex_word_count)
  """
  positive_score = 0
  negative_score = 0
  uncertainty_score  = 0
  constraining_score = 0
  complex_word_count = 0

  for word in words:
    num_of_syllables = get_num_of_syllables(word)
    if num_of_syllables > 2:
      complex_word_count += 1
    if word in positive_words:
      positive_score += 1
    if word in negative_words:
      negative_score += 1
    if word in uncertainity_words:
      uncertainty_score += 1
    if word in constraining_words:
      constraining_score += 1

  return (positive_score,negative_score,
          uncertainty_score,constraining_score,complex_word_count)

def get_polarity_score(positive_score,negative_score):
  """
  This is the score that determines if a given text is positive or negative in nature. 
  It is calculated by using the formula:
  Polarity Score = (Positive Score – Negative Score)/ ((Positive Score + Negative Score) + 0.000001)
  Range is from -1 to +1    
  """
  return (positive_score-negative_score)/(positive_score+negative_score+0.000001)

def get_subjectivity_score(positive_score,negative_score,num_of_words_after_cleaning):
  """
  This is the score that determines if a given text is objective or subjective.
  It is calculated by using the formula:
  Subjectivity Score = (Positive Score + Negative Score)/ ((Total Words after cleaning) + 0.000001)
  Range is from 0 to +1
  """
  return (positive_score+negative_score)/(num_of_words_after_cleaning+0.000001)


def get_sentiment_category(polarity):
  if polarity < -0.5:
    return "Most Negative"
  elif -0.5 < polarity < 0:
    return "Negative"
  elif polarity == 0.0:
    return "Neutral"
  elif 0 < polarity < 0.5:
    return "Positive"
  else:
    return "Very Positive"


def get_average_sentence_length(num_of_words,num_of_sentences):
  "Average Sentence Length = the number of words / the number of sentences"
  return (num_of_words/(num_of_sentences+1)) # +1 to avoid Division by 0

def get_percentage_of_complex_words(num_of_complex_words,num_of_words):
  "Percentage of Complex words = the number of complex words / the number of words"
  return (num_of_complex_words/(num_of_words+1)) # +1 to avoid Division by 0

def get_fog_index(average_sentence_length,percentage_of_complex_words):
  "Fog Index = 0.4 * (Average Sentence Length + Percentage of Complex words)"
  return (0.4 * (average_sentence_length + percentage_of_complex_words))



### Text preprocessing

In [37]:
def remove_punctuation(text):
  return ("".join([char for char in text if char not in string.punctuation]))

def remove_stop_words(words,stopwords):
  words = [word for word in words if word not in stopwords]

  return words

def lemmatize(words):
    word_net = nltk.WordNetLemmatizer()
    return [word_net.lemmatize(word) for word in words]

def remove_useless_text(text):
  """
  Remove useless text using heuristics
  """

  return text


def remove_html(text):
  text = BeautifulSoup(text, "html.parser").get_text()
  return text


def preprocess(text):
  text = text.lower()
  # Remove html tags
  text = remove_html(text)

  text = remove_punctuation(text)

  words = nltk.word_tokenize(text)

  words = lemmatize(words)

  words = remove_stop_words(words,all_stopwords) # Needs clarification, which set of stopwords to use?

  return words

In [38]:
def analyze(data_dir):
  """
  Perform the whole analysis
  """
  for index,file_name in enumerate(tqdm(os.listdir(data_dir))):
    file_path = os.path.join(data_dir,file_name)

    with open(file_path,'r') as f:
      text = f.read()

    sentences = nltk.sent_tokenize(text)
    words = preprocess(text)

    num_of_words = len(words)
    num_of_sentences = len(sentences)

    positive_score,negative_score,uncertainty_score,constraining_score,complex_word_count = get_scores(words)

    polarity_score = get_polarity_score(positive_score,negative_score)

    average_sentence_length = get_average_sentence_length(num_of_words,num_of_sentences)

    percentage_of_complex_words = complex_word_count/(num_of_words+1)

    positive_word_proportion = positive_score/(num_of_words+1)
    negative_word_proportion = negative_score/(num_of_words+1)

    uncertainty_word_proportion = uncertainty_score/(num_of_words+1)
    constraining_word_proportion = constraining_score/(num_of_words+1)

    fog_index = get_fog_index(average_sentence_length,percentage_of_complex_words)

    constraining_words_whole_report = constraining_score # This needs to be clarified and fixed

    cik_data.loc[index,"positive_score"] = positive_score
    cik_data.loc[index,"negative_score"] = negative_score
    cik_data.loc[index,"polarity_score"] = polarity_score
    cik_data.loc[index,"average_sentence_length"] = average_sentence_length
    cik_data.loc[index,"percentage_of_complex_words"] = percentage_of_complex_words
    cik_data.loc[index,"fog_index"] = fog_index
    cik_data.loc[index,"complex_word_count"] = complex_word_count
    cik_data.loc[index,"word_count"] = num_of_words
    cik_data.loc[index,"uncertainty_score"] = uncertainty_score
    cik_data.loc[index,"constraining_score"] = constraining_score
    cik_data.loc[index,"positive_word_proportion"] = positive_word_proportion
    cik_data.loc[index,"negative_word_proportion"] = negative_word_proportion
    cik_data.loc[index,"uncertainty_word_proportion"] = uncertainty_word_proportion
    cik_data.loc[index,"constraining_word_proportion"] = constraining_word_proportion
    cik_data.loc[index,"constraining_words_whole_report"] = constraining_words_whole_report

In [39]:
%%time
analyze(SEC_DATA_DIR)

100%|██████████| 152/152 [02:43<00:00,  1.08s/it]

CPU times: user 2min 42s, sys: 837 ms, total: 2min 42s
Wall time: 2min 43s





In [40]:
cik_data.to_excel(OUTPUT_FILE_PATH)

### Test preprocessing

In [41]:
sample_file = os.path.join(SEC_DATA_DIR,"0.txt")

with open(sample_file) as f:
  sample_text = f.read()

In [42]:
%%time
processed_sample_text = preprocess(sample_text)

CPU times: user 1.48 s, sys: 7.35 ms, total: 1.49 s
Wall time: 1.51 s


In [43]:
print(processed_sample_text[:20])

['begin', 'privacyenhanced', 'message', 'proctype', '2001micclear', 'originatorname', 'webmasterwwwsecgov', 'originatorkeyasymmetric', 'mfgwcgyevqgbaqicaf8dsgawrwjaw2snkk9avtbzyzmr6agjlwyk3xmzv3dtinen', 'twsm7vrzladbmyqaionwg5sdw3p6oam5d3tdezxmm7z1tbtwidaqab', 'micinfo', 'rsamd5rsa', 'evpdkfnjzbijwkek2rgnck152qxomhpnldwlxttxbuazk70ayyrsxlqbyiqr', 'v5559qrytgpe9pfvt0db9q', '000095017098000413txt', '19980309', '000095017098000413hdrsgml', '19980309', 'accession', 'number']
