In [1]:
## UTILITIES

import re

def check_distinct(rdd):
  ''' check if all words in dict_ are distinct '''
  tweet_ids = rdd.map(lambda x: x[0])
  total_count = tweet_ids.count()
  distinct_count = tweet_ids.distinct().count()
  if total_count == distinct_count:
    print("All tweets are distinct")
  else:
    print("All tweets are NOT distinct")
  
  
def process(text):
  ''' remove punctuation, make all lowercase, split'''
  pattern = re.compile('[^\w ]+')
  text = pattern.sub('', text) # remove punctuation
  text = text.lower() # make lowercase
  return text.split(" ")

  
def binary_search(val, arr):
  ''' binary search for val in arr
  return index if exists, otherwise -1
  '''
  left, right = 0, len(arr) - 1
  while left <= right:
      mid = left + (right - left) // 2
      mid_val = arr[mid]
      if val < mid_val:
          right = mid - 1
      elif val > mid_val:
          left = mid + 1
      else:
          return mid
  return -1


def clean_word(word):
  ''' remove non-alphanumeric and make lowercase '''
  pattern = re.compile('[\W]+')
  return pattern.sub('', word.lower()) 


# get list of legitimate english words
ENGLISH_WORDS = sorted(sc.textFile("/FileStore/tables/words.txt").map(clean_word).collect())


def is_legit(word):
  ''' check if word is a legitimate english word '''
  if binary_search(word[0], ENGLISH_WORDS) >= 0:
    return True
  return False

In [2]:
def vocab_ratio(username, tweets):
  ''' Analyzes tweets of user to determine the vocab size per 10k words '''  
  SAMPLE_SIZE = 5000
#   username, tweets = user
  rdd = sc.parallelize(tweets.asDict().items())
  print(f"processing tweets of: {username}")
#   check_distinct(rdd)
  # extract only text, and filter out retweets
  tweets_rdd = rdd.map(lambda x: x[1]).filter(lambda tweet: tweet[:2] != "RT")
  # separate words, filter out improper words
  words_rdd = tweets_rdd.flatMap(process).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).filter(is_legit).flatMap(lambda x: [x[0]] * x[1])
  # randomly sampling 10k words
  total_count = words_rdd.count()
  if total_count < SAMPLE_SIZE:
    print("not enough words")
    return
  sampled_rdd = words_rdd.sample(withReplacement=False, fraction=SAMPLE_SIZE/total_count)
  # count frequency of each word
  freq_rdd = sampled_rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
  distinct_count = freq_rdd.count()
  sampled_count = sampled_rdd.count()
  ratio = distinct_count / sampled_count
  print(f"{distinct_count} out of {sampled_count} words. Ratio: {ratio}")
  return ratio
  

In [3]:
file_location = "/FileStore/tables/all_tweets.json"
rdd = spark.read.option("multiline", "true").json(file_location).rdd.flatMap(lambda x: list(x.asDict().items()))

In [4]:
ratio_dict = {}
for user in rdd.collect():
  username, tweets = user
  ratio = vocab_ratio(username, tweets)
  ratio_dict[username] = ratio

In [5]:
ratio_dict

In [6]:
import json

write_file_location = "/dbfs/FileStore/output/ratio_dict.txt"
with open(write_file_location, 'w') as f:
  f.write(json.dumps(ratio_dict))

In [7]:
## reading and writing file
# file location: "/dbfs/FileStore/output/test_file.txt"
# url to download file: https://community.cloud.databricks.com/files/output/ratio_dict.txt?o=8290779921351936