En este archivo puedes escribir lo que estimes conveniente. Te recomendamos detallar tu solución y todas las suposiciones que estás considerando. Aquí puedes ejecutar las funciones que definiste en los otros archivos de la carpeta src, medir el tiempo, memoria, etc.

### Installing memory-profiler package to asset memory performance

In [23]:
!pip install -U memory-profiler==0.61.0

Requirement already up-to-date: memory-profiler==0.61.0 in c:\users\usuario\anaconda3\lib\site-packages (0.61.0)


In [1]:
%load_ext memory_profiler

### Installing emoji package to handle emoji count problem

In [37]:
!pip install emoji==2.9.0

Collecting emoji==2.9.0
  Downloading emoji-2.9.0-py2.py3-none-any.whl (397 kB)
Installing collected packages: emoji
Successfully installed emoji-2.9.0


In [2]:
file_path = "farmers-protest-tweets-2021-2-4.json"

# 1. Initializing PySpark

## Here, we're going to use PySpark as our framework to handle tweets json file

In [3]:
from pyspark.sql import SparkSession
# from pyspark import SparkContext, SparkConf

In [4]:
import findspark
findspark.init()

In [5]:
spark = SparkSession \
.builder \
.config('spark.python.profile.memory','true') \
.config('spark.python.profile','true') \
.appName('latam_challenge') \
.getOrCreate()

In [6]:
spark

# 2. Reading Tweets JSON File

##  The goal here is to check file structure and content from a dataframe preview

In [20]:
tweet_list_df = spark.read.json(file_path)

In [21]:
tweet_list_df.printSchema()

root
 |-- content: string (nullable = true)
 |-- conversationId: long (nullable = true)
 |-- date: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- likeCount: long (nullable = true)
 |-- media: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- duration: double (nullable = true)
 |    |    |-- fullUrl: string (nullable = true)
 |    |    |-- previewUrl: string (nullable = true)
 |    |    |-- thumbnailUrl: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- variants: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- bitrate: long (nullable = true)
 |    |    |    |    |-- contentType: string (nullable = true)
 |    |    |    |    |-- url: string (nullable = true)
 |-- mentionedUsers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |   

In [8]:
tweet_list_df.show(5)

+--------------------+-------------------+--------------------+-------------------+----+---------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+------------+--------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|             content|     conversationId|                date|                 id|lang|likeCount|               media|      mentionedUsers|            outlinks|quoteCount|         quotedTweet|     renderedContent|replyCount|retweetCount|retweetedTweet|              source|        sourceLabel|           sourceUrl|         tcooutlinks|                 url|                user|
+--------------------+-------------------+--------------------+-------------------+----+---------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+------------+----

# 3. Solving problems

# 3.0 Importing output data type and performance libraries

In [7]:
# Output data type
from typing import List, Tuple
from datetime import datetime

# Transformation
from pyspark.sql.functions import to_date, col, row_number, desc, size, expr, explode, filter
from pyspark.sql.window import Window

# Time performance
from cProfile import Profile
from pstats import SortKey, Stats

# Emoji package
import emoji

## 3.1 Time -> Las top 10 fechas donde hay más tweets. Mencionar el usuario (username) que más publicaciones tiene por cada uno de esos días

In [13]:
def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    """
    This ``function`` reads tweets json file and process it to compute the first 10 dates with most tweets
    and their respective user with the most tweets. This, in terms of time performance.

        Args:
            file_path (str): Tweet JSON file which contains tweets from several days.

        Returns:
            q1_time_lst (List): List of tuples containing the result from data processing.
    """

    tweet_list_df = spark.read.json(file_path)

    tweet_list_df = tweet_list_df.coalesce(4)
    
    tweet_list_df = tweet_list_df \
    .withColumn("parsed_date", to_date("date")) \
    .select(col("parsed_date").alias("tweet_dt"),col("user.username").alias("user_name"))
    
    tweet_grp_df = tweet_list_df \
    .groupBy("tweet_dt") \
    .count() \
    .sort(desc("count")) \
    .limit(10)
    
    tweet_usr_grp_df = tweet_list_df \
    .join(tweet_grp_df, "tweet_dt", "inner") \
    .groupBy("tweet_dt", "user_name")\
    .count() \
    .withColumn("rank_num", row_number().over(Window.partitionBy("tweet_dt").orderBy(desc("count")))) \
    .filter(col("rank_num") == 1) \
    .select("tweet_dt","user_name")

    q1_time_lst = tweet_usr_grp_df \
        .rdd.map(lambda x: (x[0], x[1])) \
        .collect()
    
    return q1_time_lst

In [10]:
with Profile() as profile:
    print(q1_time(file_path))
    (
     Stats(profile)
     .strip_dirs()
     .sort_stats(SortKey.CALLS)
     .print_stats()
    )

[(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 14), 'rebelpacifist'), (datetime.date(2021, 2, 15), 'jot__b'), (datetime.date(2021, 2, 16), 'jot__b'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 19), 'Preetm91'), (datetime.date(2021, 2, 20), 'MangalJ23056160'), (datetime.date(2021, 2, 23), 'Surrypuria')]
         11372 function calls (11276 primitive calls) in 12.105 seconds

   Ordered by: call count

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1795    0.000    0.000    0.001    0.000 {built-in method builtins.isinstance}
  492/488    0.000    0.000    0.000    0.000 {built-in method builtins.len}
      415    0.001    0.000    0.001    0.000 protocol.py:214(smart_decode)
      396    0.001    0.000    0.001    0.000 inspect.py:1525(_static_getmro)
      322    0.000    0.000    0.000    0.000 {method '

## 3.2 Memory -> Las top 10 fechas donde hay más tweets. Mencionar el usuario (username) que más publicaciones tiene por cada uno de esos días

In [15]:
def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    """
    This ``function`` reads tweets json file and process it to compute the first 10 dates with most tweets
    and their respective user with the most tweets. This, in terms of memory performance.

        Args:
            file_path (str): Tweet JSON file which contains tweets from several days.

        Returns:
            q1_memory_lst (List): List of tuples containing the result from data processing.
    """

    tweet_list_df = spark.read.json(file_path)
    
    tweet_list_df = tweet_list_df \
    .withColumn("parsed_date", to_date("date")) \
    .select(col("parsed_date").alias("tweet_dt"),col("user.username").alias("user_name"))

    tweet_list_df.cache().count()
    
    tweet_grp_df = tweet_list_df \
    .groupBy("tweet_dt") \
    .count() \
    .sort(desc("count")) \
    .limit(10)
    
    tweet_usr_grp_df = tweet_list_df \
    .join(tweet_grp_df, "tweet_dt", "inner") \
    .groupBy("tweet_dt", "user_name")\
    .count() \
    .withColumn("rank_num", row_number().over(Window.partitionBy("tweet_dt").orderBy(desc("count")))) \
    .filter(col("rank_num") == 1) \
    .select("tweet_dt","user_name")

    tweet_list_df.unpersist().cache()

    q1_memory_lst = tweet_usr_grp_df \
        .rdd.map(lambda x: (x[0], x[1])) \
        .collect()
    
    return q1_memory_lst

In [12]:
%memit q1_memory(file_path)

peak memory: 78.42 MiB, increment: 0.07 MiB


## 3.3 Time -> Los top 10 emojis más usados con su respectivo conteo

In [16]:
def q2_time(file_path: str) -> List[Tuple[str, int]]:
    """
    This ``function`` reads tweets json file and process it to compute the first 10 used emojis with their
    respective count. This, in terms of time performance.

        Args:
            file_path (str): Tweet JSON file which contains tweets from several days.

        Returns:
            q2_time_lst (List): List of tuples containing the result from data processing.
    """

    tweet_list_df = spark.read.json(file_path)
    
    re_pattern = r'[\u0080-\uffff\U00010000-\U0001ffff]'
    to_compare = list(emoji.unicode_codes.EMOJI_DATA.keys())
    
    is_emoji = lambda x: x.isin(to_compare)

    tweet_list_df = tweet_list_df.coalesce(4)
    
    tweet_emoji_df = tweet_list_df \
    .withColumn('emoji_lst', expr(f"regexp_extract_all(content, '{re_pattern}', 0)")) \
    .filter(size(col('emoji_lst'))!=0) \
    .select('emoji_lst') \
    .withColumn('emoji_new',filter('emoji_lst', is_emoji)) \
    .filter(size(col('emoji_new'))!=0) \
    .select(explode('emoji_new').alias('emoji_content')) \
    .groupBy("emoji_content") \
    .count() \
    .sort(desc("count")) \
    .limit(10)

    q2_time_lst = tweet_emoji_df \
        .rdd.map(lambda x: (x[0], x[1])) \
        .collect()
    
    return q2_time_lst

In [17]:
with Profile() as profile:
    print(q2_time(file_path))
    (
     Stats(profile)
     .strip_dirs()
     .sort_stats(SortKey.CALLS)
     .print_stats()
    )

[('🙏', 7286), ('😂', 3072), ('🚜', 2972), ('✊', 2411), ('🌾', 2363), ('🏻', 2080), ('❤', 1779), ('🤣', 1668), ('🏽', 1218), ('👇', 1108)]
         1132556 function calls (1118322 primitive calls) in 8.950 seconds

   Ordered by: call count

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   190983    0.026    0.000    0.032    0.000 {built-in method builtins.isinstance}
43076/43072    0.006    0.000    0.006    0.000 {built-in method builtins.len}
    42961    0.026    0.000    0.026    0.000 inspect.py:1525(_static_getmro)
    38194    0.013    0.000    0.013    0.000 {method 'startswith' of 'str' objects}
    38167    0.033    0.000    0.033    0.000 {method 'format' of 'str' objects}
    38166    0.017    0.000    0.031    0.000 __init__.py:1412(debug)
    38166    0.014    0.000    0.014    0.000 __init__.py:1677(isEnabledFor)
    33520    0.041    0.000    0.051    0.000 protocol.py:214(smart_decode)
    23873    0.037    0.000    0.044    0.000 inspect.py:1553(_s

        1    0.000    0.000    0.001    0.001 session.py:743(read)
        1    0.000    0.000    1.884    1.884 dataframe.py:80(rdd)
        1    0.000    0.000    0.005    0.005 dataframe.py:719(limit)
        1    0.000    0.000    0.006    0.006 dataframe.py:862(coalesce)
        1    0.000    0.000    0.006    0.006 dataframe.py:1388(sort)
        1    0.000    0.000    0.000    0.000 dataframe.py:1450(<listcomp>)
        1    0.000    0.000    0.001    0.001 dataframe.py:1443(_sort_cols)
        1    0.000    0.000    0.003    0.003 dataframe.py:1738(groupBy)
        1    0.006    0.006    4.347    4.347 column.py:597(<listcomp>)
        1    0.000    0.000    4.809    4.809 column.py:581(isin)
        1    0.000    0.000    0.000    0.000 column.py:712(alias)
        1    0.000    0.000    0.000    0.000 readwriter.py:32(_set_opts)
        1    0.000    0.000    0.001    0.001 readwriter.py:52(__init__)
        1    0.000    0.000    0.000    0.000 readwriter.py:56(_df)
        

## 3.4 Memory -> Los top 10 emojis más usados con su respectivo conteo

In [44]:
def q2_memory(file_path: str) -> List[Tuple[str, int]]:
    """
    This ``function`` reads tweets json file and process it to compute the first 10 used emojis with their
    respective count. This, in terms of time performance.

        Args:
            file_path (str): Tweet JSON file which contains tweets from several days.

        Returns:
            q2_memory_lst (List): List of tuples containing the result from data processing.
    """

    tweet_list_df = spark.read.json(file_path)
    
    tweet_list_df.cache().count()
    
    re_pattern = r'[\u0080-\uffff\U00010000-\U0001ffff]'
    to_compare = list(emoji.unicode_codes.EMOJI_DATA.keys())
    
    is_emoji = lambda x: x.isin(to_compare)
    
    tweet_emoji_df = tweet_list_df \
    .withColumn('emoji_lst', expr(f"regexp_extract_all(content, '{re_pattern}', 0)")) \
    .filter(size(col('emoji_lst'))!=0) \
    .select('emoji_lst') \
    .withColumn('emoji_new',filter('emoji_lst', is_emoji)) \
    .filter(size(col('emoji_new'))!=0) \
    .select(explode('emoji_new').alias('emoji_content')) \
    .groupBy("emoji_content") \
    .count() \
    .sort(desc("count")) \
    .limit(10)
    
    tweet_list_df.unpersist().count()

    q2_memory_lst = tweet_emoji_df \
        .rdd.map(lambda x: (x[0], x[1])) \
        .collect()
    
    return q2_memory_lst

In [10]:
%memit q2_memory(file_path)

peak memory: 83.72 MiB, increment: 7.31 MiB


## 3.5 Time -> El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@) que registra cada uno de ellos

In [42]:
def q3_time(file_path: str) -> List[Tuple[str, int]]:
    """
    This ``function`` reads tweets json file and process it to compute the first 10 influential users with their
    respective count. This, in terms of time performance.

        Args:
            file_path (str): Tweet JSON file which contains tweets from several days.

        Returns:
            q3_time_lst (List): List of tuples containing the result from data processing.
    """

    tweet_list_df = spark.read.json(file_path)
    
    tweet_list_df = tweet_list_df.coalesce(4)
        
    tweet_top_usr_df = tweet_list_df \
    .select('mentionedUsers.username') \
    .filter(col('username').isNotNull()) \
    .select(explode('username').alias('user_name')) \
    .groupBy('user_name') \
    .count() \
    .sort(desc('count')) \
    .limit(10)
    
    q3_time_lst = tweet_top_usr_df \
        .rdd.map(lambda x: (x[0], x[1])) \
        .collect()
    
    return q3_time_lst

In [43]:
with Profile() as profile:
    print(q3_time(file_path))
    (
     Stats(profile)
     .strip_dirs()
     .sort_stats(SortKey.CALLS)
     .print_stats()
    )

[('narendramodi', 2265), ('Kisanektamorcha', 1840), ('RakeshTikaitBKU', 1644), ('PMOIndia', 1427), ('RahulGandhi', 1146), ('GretaThunberg', 1048), ('RaviSinghKA', 1019), ('rihanna', 986), ('UNHumanRights', 962), ('meenaharris', 926)]
         6607 function calls (6580 primitive calls) in 3.381 seconds

   Ordered by: call count

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      941    0.000    0.000    0.000    0.000 {built-in method builtins.isinstance}
  342/338    0.000    0.000    0.000    0.000 {built-in method builtins.len}
      232    0.000    0.000    0.000    0.000 inspect.py:1525(_static_getmro)
      193    0.000    0.000    0.000    0.000 protocol.py:214(smart_decode)
      191    0.000    0.000    0.000    0.000 {method 'startswith' of 'str' objects}
      165    0.000    0.000    0.000    0.000 {method 'format' of 'str' objects}
      164    0.000    0.000    0.000    0.000 __init__.py:1412(debug)
      164    0.000    0.000    0.000    0.000 

## 3.6 Memory -> El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@) que registra cada uno de ellos

In [45]:
def q3_memory(file_path: str) -> List[Tuple[str, int]]:
    """
    This ``function`` reads tweets json file and process it to compute the first 10 influential users with their
    respective count. This, in terms of memory performance.

        Args:
            file_path (str): Tweet JSON file which contains tweets from several days.

        Returns:
            q3_memory_lst (List): List of tuples containing the result from data processing.
    """

    tweet_list_df = spark.read.json(file_path)
    
    tweet_list_df.cache().count()
        
    tweet_top_usr_df = tweet_list_df \
    .select('mentionedUsers.username') \
    .filter(col('username').isNotNull()) \
    .select(explode('username').alias('user_name')) \
    .groupBy('user_name') \
    .count() \
    .sort(desc('count')) \
    .limit(10)
    
    tweet_list_df.unpersist().count()
    
    q3_memory_lst = tweet_top_usr_df \
        .rdd.map(lambda x: (x[0], x[1])) \
        .collect()
    
    return q3_memory_lst

In [46]:
%memit q3_memory(file_path)

peak memory: 70.20 MiB, increment: 0.05 MiB
