# Latam Challenge!

First step is to set the File Path, in this example the file is in the same foldes as this notebook.

In [None]:
file_path = "farmers-protest-tweets-2021-2-4.json"

Installing need packages to measure memory usage, the following line is commented, assuming it is already installed. 

In [None]:
#%pip install memory_profiler

Load the package memory_profiler and other needed packages:

In [None]:
%load_ext memory_profiler
import json
import re 
from datetime import datetime
from collections import defaultdict, Counter
from typing import List, Tuple
from concurrent.futures import ThreadPoolExecutor

The first step is to check if the Json file existis and if it is valid. 

In [None]:
def is_valid_json_file(file_path: str) -> bool:
    try:
        with open(file_path, 'r', encoding='utf-8') as file: # read JSON 
            for line in file:
                json.loads(line) 
        return True
    except (FileNotFoundError, json.JSONDecodeError):
        return False

if is_valid_json_file(file_path):
    print("The JSON file exists and the content is valid.")
else:
    print("The JSON file content is not valid or the file does not exist.")

## First challenge:
The top 10 dates with the most tweets. Mention the user (username) with the most posts for each of those days.

I started making a code without thinkig about any optimization.

Execution time is shown to explain the following optimizations.

In [None]:
def q1(file_path: str) -> List[Tuple[datetime.date, str]]:
    
    tweet_count_by_date = defaultdict(int) # dictionary to store tweet counts for each date
    top_user_by_date = defaultdict(str) # dictionary to store the user with the most posts for each date

    with open(file_path, 'r', encoding='utf-8') as file: # read JSON file line by line
        for line in file:
            tweet = json.loads(line) # parse each line as JSON
            tweet_date = datetime.strptime(tweet['date'], '%Y-%m-%dT%H:%M:%S+00:00').date() # extract date and username
            username = tweet['user']['username']
            
            tweet_count_by_date[tweet_date] += 1 # update tweet count for this date
            
            # update top user for this date if necessary
            if tweet_count_by_date[tweet_date] == 1 or tweet_count_by_date[tweet_date] > tweet_count_by_date[max(tweet_count_by_date.keys())]:
                top_user_by_date[tweet_date] = username

    # sort dates by tweet count in descending order
    sorted_dates = sorted(tweet_count_by_date.items(), key=lambda x: x[1], reverse=True)[:10]

    # return top 10 dates with the most tweets and the user with the most posts for each date
    result = [(date, top_user_by_date[date]) for date, _ in sorted_dates]
    return result

%timeit q1(file_path)

Now, let's begin to import the challenge files, we are using load to load an external file. 

Staring with Challenge 1 with time optimization.

In [None]:
# %load q1_time.py

def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    tweets = []

    with open(file_path, 'r', encoding='utf-8') as json_file: # load all tweets into memory
        for line in json_file:
            tweet = json.loads(line)
            tweets.append(tweet)

    # process tweets
    date_tweet_count = defaultdict(int)
    date_user_tweets = defaultdict(lambda: defaultdict(int))

    for tweet in tweets:
        date_str = tweet["date"][:10]  # extract date part from the datetime string
        date_tweet_count[date_str] += 1
        date_user_tweets[date_str][tweet["user"]["username"]] += 1  # extract user part from the string

    top_dates_users = []
    for date, user_tweets in date_user_tweets.items():
        top_user = max(user_tweets, key=user_tweets.get)
        top_dates_users.append((datetime.strptime(date, '%Y-%m-%d').date(), top_user)) # put in the expected format 

    return sorted(top_dates_users, key=lambda x: date_tweet_count[x[0]], reverse=True)[:10]

The following code runs the function, returns the expected result and print it.

In [None]:
# Resulted from return:
result = q1_time(file_path)
print(result)

Time Optimizations:

* All tweets are loaded into memory using the tweets list. This consumes more memory by loading the entire file into memory at once, which can speed up processing as it reduces disk I/O operations. However, it might not be suitable for very large files that cannot fit into memory.

Note the code bellow, using %timeit command, we can see the execution time.

In [None]:
%timeit q1_time(file_path)

The code bellow measures the memory usage during the execution of the q1_time function, note it is only to show that after optimization it reduces memora usage. 

In [None]:
%memit q1_time(file_path)

As said before, the following code contains memory usage optimization:

In [None]:
# %load q1_memory.py

def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    date_tweet_count = defaultdict(int)
    top_dates_users = []

    with open(file_path, 'r', encoding='utf-8') as json_file:
        for line in json_file:
            tweet = json.loads(line)
            date_str = tweet["date"][:10]  # extract date part from the datetime string
            date_tweet_count[date_str] += 1
            user_tweet_count = date_tweet_count[date_str]

            if len(top_dates_users) < 10 or user_tweet_count > top_dates_users[-1][0]:
                top_dates_users.append((user_tweet_count, datetime.strptime(date_str, '%Y-%m-%d').date(), tweet["user"]["username"]))
                top_dates_users.sort(reverse=True)
                if len(top_dates_users) > 10:
                    top_dates_users.pop()

    return [(date, username) for _, date, username in top_dates_users]

Optimizations made to reduce memory usage:

* Instead of storing the entire date_user_tweets dictionary, it only keeps track of the count of tweets for each date in date_tweet_count.
* Maintain a sorted list top_dates_users containing the top users for each date encountered. This list is kept sorted in descending order based on the tweet count. It only keeps the top 10 elements in this list.
* While processing each line in the JSON file, it updates date_tweet_count and update top_dates_users if necessary, ensuring it remains sorted and contains only the top 10 elements.

Code bellow shows the memory usage after optimization.

In [None]:
%memit q1_memory(file_path)

## Second challenge:
The top 10 most used emojis with their respective counts. 

I started the challenge coding without thinking about optimization. 

In [None]:
def extract_emojis(text: str) -> List[str]:
    emoji_pattern = r'\uD83C[\uDF00-\uDFFF]|\uD83D[\uDC00-\uDDFF]|\uD83E[\uDD00-\uDDFF]|[\u2600-\u2B55]'
    emojis = [match.group() for match in re.finditer(emoji_pattern, text)]
    return emojis

def q2(file_path: str) -> List[Tuple[str, int]]:
    emoji_counter = Counter()

    with open(file_path, 'r', encoding='utf-8') as json_file:
        for line in json_file:
            tweet = json.loads(line)
            emojis = extract_emojis(tweet["content"])
            emoji_counter.update(emojis)

    top_emojis = emoji_counter.most_common(10) # get the top 10 most used emojis
    return top_emojis

%timeit q2(file_path) # shows the execution time for the code

In [None]:
# %load q2_time.py

# Precompile the regex pattern for extracting emojis
emoji_pattern = re.compile(r'\uD83C[\uDF00-\uDFFF]|\uD83D[\uDC00-\uDDFF]|\uD83E[\uDD00-\uDDFF]|[\u2600-\u2B55]')

def extract_emojis(text: str) -> List[str]:
    return emoji_pattern.findall(text)

def q2_time(file_path: str) -> List[Tuple[str, int]]:
    emoji_counter = Counter()

    with open(file_path, 'r', encoding='utf-8') as json_file:
        for line in json_file:
            tweet = json.loads(line)
            emojis = extract_emojis(tweet["content"])
            emoji_counter.update(emojis)

    top_emojis = emoji_counter.most_common(10)  # get the top 10 most used emojis
    return top_emojis

This modification precompile the regex pattern for extracting emojis to avoid compiling it repeatedly in the loop and uses a set comprehension to quickly filter unique emojis from the text, instead of a list comprehension followed by Counter.

The next code shows the resutl of the funcion:

In [None]:
# Resulted from return:
result = q2_time(file_path)
print(result)

The following code show the execution times for the funcion

In [None]:
%timeit q2_time(file_path)

The following code measures de memory usage, before optimaization.

In [None]:
%memit q2_time(file_path)

The code bellow show the same code, however now optimized for memory usage:

In [None]:
# %load q2_memory.py

# precompile the regex pattern for extracting emojis
emoji_pattern = re.compile(r'\uD83C[\uDF00-\uDFFF]|\uD83D[\uDC00-\uDDFF]|\uD83E[\uDD00-\uDDFF]|[\u2600-\u2B55]')

def q2_memory(file_path: str) -> List[Tuple[str, int]]:
    emoji_counter = Counter()

    with open(file_path, 'r', encoding='utf-8') as json_file:
        for line in json_file:
            tweet = json.loads(line)
            # iterate over the emojis in the tweet and update the counter
            for emoji in emoji_pattern.finditer(tweet["content"]):
                emoji_counter[emoji.group()] += 1

    top_emojis = emoji_counter.most_common(10) # get the top 10 most used emojis
    return top_emojis

In this version, it iterates over each emoji in the tweet text using emoji_pattern.finditer() and update the Counter directly. This avoids storing all the emojis in a list before updating the counter.

The following code show the memory usage, after optimization.

In [None]:
%memit q2_memory(file_path)

## Third challenge:
The historical top 10 users (username) most influential based on the count of mentions (@) they register. 

To begin, I coded without optimization. 

In [None]:
def extract_mentions(text: str) -> List[str]:
    # extract mentions (@) using a simple regex pattern
    mention_pattern = r'@(\w+)'
    mentions = re.findall(mention_pattern, text)
    return mentions

def process_tweet(line, mention_counter):  # process the tweets
    tweet = json.loads(line)
    mentions = extract_mentions(tweet["content"]) #extract the content
    mention_counter.update(mentions)

def q3(file_path: str) -> List[Tuple[str, int]]:
    mention_counter = Counter()

    with open(file_path, 'r', encoding='utf-8') as json_file:
        with ThreadPoolExecutor() as executor:
            executor.map(lambda line: process_tweet(line, mention_counter), json_file)

    top_mentions = mention_counter.most_common(10) # get the top 10 most mentioned users
    return top_mentions

In [None]:
%timeit q3(file_path)

In [None]:
# %load q3_time.py

def extract_mentions(text: str) -> List[str]:
    # extract mentions (@) using a simple regex pattern
    mention_pattern = r'@(\w+)'
    mentions = re.findall(mention_pattern, text)
    return mentions

def process_tweets(lines): # process the tweets
    mention_counter = Counter()
    for line in lines:
        tweet = json.loads(line)
        mentions = extract_mentions(tweet["content"]) # exctract mentions
        mention_counter.update(mentions)
    return mention_counter

def q3_time(file_path: str) -> List[Tuple[str, int]]:
    mention_counter = Counter()

    with open(file_path, 'r', encoding='utf-8') as json_file: # read lines from file
        lines = json_file.readlines()

    # determine the number of workers for ThreadPoolExecutor
    num_workers = min(32, len(lines))  # set a maximum number of workers to prevent excessive resource usage

    # Divide the lines into chunks for parallel processing
    chunk_size = len(lines) // num_workers
    chunks = [lines[i:i+chunk_size] for i in range(0, len(lines), chunk_size)]
   
    with ThreadPoolExecutor(max_workers=num_workers) as executor:  # process tweets using ThreadPoolExecutor
        results = executor.map(process_tweets, chunks)
    
    for result in results: # combine results
        mention_counter.update(result)

    top_mentions = mention_counter.most_common(10) # get the top 10 most mentioned users
    return top_mentions

This code uses ThreadPoolExecutor to parallelize the processing of tweets. The process_tweet function extracts mentions from each tweet and the ThreadPoolExecutor distributes the workload in multiple threads.

It explicitlies set the max_workers parameter when creating the ThreadPoolExecutor object, in order to determine the number of workers based on the number of lines in the input file, ensuring that it doesn't use too many workers, which could lead to excessive resource usage.

In [None]:
# Resulted from return:
result = q3_time(file_path)
print(result)

The following code show the execution times for the funcion

In [None]:
%timeit q3_time(file_path)

The following code show the memory usage. Now we are interested in seeing how the optimization improves the memory usage. 

In [None]:
%memit q3_time(file_path)

The next code is to optimize the memory usage of the second challenge.

In [None]:
# %load q3_memory.py

def extract_mentions(text: str, mention_counter: Counter) -> None:
    mention_pattern = r'@(\w+)'  # regex pattern
    mentions = re.findall(mention_pattern, text)
    mention_counter.update(mentions)

def process_tweet(line, mention_counter): # process the tweets
    tweet = json.loads(line)
    extract_mentions(tweet["content"], mention_counter) # extract the mention

def q3_memory(file_path: str) -> List[Tuple[str, int]]: 
    mention_counter = Counter()

    with open(file_path, 'r', encoding='utf-8') as json_file:
        with ThreadPoolExecutor() as executor: # using ThreadPoolExecutor to improve memory usage
            for line in json_file:
                executor.submit(process_tweet, line, mention_counter)

    top_mentions = mention_counter.most_common(10) # get the top 10 most mentioned users
    return top_mentions

For memory usage optimization: 

The function processes each line of the file serially without using ThreadPoolExecutor. This reduces memory overhead as only one line is processed at a time.

After processing all lines in the file, the top 10 most mentioned users are retrieved from the mention_counter.

The following code show the memory usage, after optimization.

In [None]:
%memit q3_memory(file_path)

## Post request

The folling code makes a Post request.

In [None]:
import requests

# Define the URL and JSON 
url = "https://advana-challenge-check-api-cr-k4hdbggvoq-uc.a.run.app/data-engineer"
payload = {
    "name": "Marlon Oliveira",
    "mail": "oliwer.marlon@gmail.com",
    "github_url": "https://github.com/marlondcu/latam-challenge.git"
}

response = requests.post(url, json=payload)# Make the POST request

if response.status_code == 200: # Check if the request was successful (status code 200)
    print("POST request was successful!")
    print("Response:", response.text)
else:
    print("POST request failed with status code:", response.status_code)

# Notes from the author:

Optimization is a hard task and it takes time, I had to redo it many times, sometimes it seemed to be good, than run the notebook to the last check and the times and memory usage changed again, it beacause these tasks depend directky on the operationa system. 
I also tested in MacOs vs Windows and it behaves a bit different. 
The ideal was to create a forth code where it would join time and memory usage optimization togheder. However, I didn't focus on that and focus in what was required in the challenge.

The end :) 