En este archivo puedes escribir lo que estimes conveniente. Te recomendamos detallar tu solución y todas las suposiciones que estás considerando. Aquí puedes ejecutar las funciones que definiste en los otros archivos de la carpeta src, medir el tiempo, memoria, etc.

In [4]:
file_path = "../data/raw/farmers-protest-tweets-2021-2-4.json"


### SparkSession builder and file read

In [5]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.appName("FarmersProtestTweets").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/03 12:00:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/03 12:00:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [7]:
df = spark.read.json(file_path)

                                                                                

### Dataframe print schema and content

In [8]:
df.printSchema()

root
 |-- content: string (nullable = true)
 |-- conversationId: long (nullable = true)
 |-- date: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- likeCount: long (nullable = true)
 |-- media: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- duration: double (nullable = true)
 |    |    |-- fullUrl: string (nullable = true)
 |    |    |-- previewUrl: string (nullable = true)
 |    |    |-- thumbnailUrl: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- variants: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- bitrate: long (nullable = true)
 |    |    |    |    |-- contentType: string (nullable = true)
 |    |    |    |    |-- url: string (nullable = true)
 |-- mentionedUsers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |   

In [9]:
df.show(5)

                                                                                

+--------------------+-------------------+--------------------+-------------------+----+---------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+------------+--------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|             content|     conversationId|                date|                 id|lang|likeCount|               media|      mentionedUsers|            outlinks|quoteCount|         quotedTweet|     renderedContent|replyCount|retweetCount|retweetedTweet|              source|        sourceLabel|           sourceUrl|         tcooutlinks|                 url|                user|
+--------------------+-------------------+--------------------+-------------------+----+---------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+------------+----

#### Creating Temp View for Data Exploration

In [10]:
df.createOrReplaceTempView("farmers_protest")

### Exploring columns to be used in data curation

In [11]:
result = spark.sql("""
    SELECT
        content,
        mentionedUsers
    FROM
        farmers_protest
""")

result.show()

+--------------------+--------------------+
|             content|      mentionedUsers|
+--------------------+--------------------+
|The world progres...|[{NULL, NULL, NUL...|
|#FarmersProtest \...|[{NULL, NULL, NUL...|
|ਪੈਟਰੋਲ ਦੀਆਂ ਕੀਮਤਾ...|                NULL|
|@ReallySwara @roh...|[{NULL, NULL, NUL...|
|#KisanEktaMorcha ...|                NULL|
|Jai jwaan jai kis...|                NULL|
|     #FarmersProtest|                NULL|
|#ModiDontSellFarm...|                NULL|
|@mandeeppunia1 wa...|[{NULL, NULL, NUL...|
|#FarmersProtest h...|                NULL|
|கோதுமைப் பயிர்களை...|                NULL|
|@mandeeppunia1 wa...|[{NULL, NULL, NUL...|
|Another farmer, M...|                NULL|
|Jai kissan #Farme...|                NULL|
|#FarmersProtest h...|                NULL|
|ਸਰਕਾਰੇ ਨੀ ਤੇਰੇ ਕੰ...|                NULL|
|@akshaykumar Hi c...|[{NULL, NULL, NUL...|
|#ModiDontSellFarm...|                NULL|
|@taapsee watch fu...|[{NULL, NULL, NUL...|
|#FarmersProtest h...|          

#### Understanding data usage for challenge

q1. Las top 10 fechas donde hay más tweets. Mencionar el usuario (username) que más publicaciones tiene por cada uno de esos días.

**Columns: id, date, user.username**


q2. Los top 10 emojis más usados con su respectivo conteo.

**Columns: id, content**


q3. El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@) que registra cada uno de ellos. 

The "mentionedUsers" at the main tweet level appear to be filled with null values, necessitating the transformation of the content to retrieve the users.

**Columns: id, content, user.username**

In [12]:
result = spark.sql("""
    SELECT
        id,
        date,
        content,
        user.username
        
    FROM
        farmers_protest
""")

result.show()

+-------------------+--------------------+--------------------+---------------+
|                 id|                date|             content|       username|
+-------------------+--------------------+--------------------+---------------+
|1364506249291784198|2021-02-24T09:23:...|The world progres...|ArjunSinghPanam|
|1364506237451313155|2021-02-24T09:23:...|#FarmersProtest \...|     PrdeepNain|
|1364506195453767680|2021-02-24T09:23:...|ਪੈਟਰੋਲ ਦੀਆਂ ਕੀਮਤਾ...| parmarmaninder|
|1364506167226032128|2021-02-24T09:23:...|@ReallySwara @roh...|  anmoldhaliwal|
|1364506144002088963|2021-02-24T09:23:...|#KisanEktaMorcha ...|     KotiaPreet|
|1364506120497360896|2021-02-24T09:23:...|Jai jwaan jai kis...|      babli_708|
|1364506076272496640|2021-02-24T09:22:...|     #FarmersProtest|Varinde17354019|
|1364505995859423234|2021-02-24T09:22:...|#ModiDontSellFarm...|    BitnamSingh|
|1364505991887347714|2021-02-24T09:22:...|@mandeeppunia1 wa...|  anmoldhaliwal|
|1364505896576053248|2021-02-24T09:22:..

In [13]:
result.printSchema()

root
 |-- id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- content: string (nullable = true)
 |-- username: string (nullable = true)



##### Checking null values in columns

In [14]:
from pyspark.sql.functions import sum, when

In [15]:
null_counts = result.select([sum(when(result[col].isNull(), 1).otherwise(0)).alias(col) for col in result.columns])

null_counts.show()

[Stage 4:>                                                        (0 + 10) / 10]

+---+----+-------+--------+
| id|date|content|username|
+---+----+-------+--------+
|  0|   0|      0|       0|
+---+----+-------+--------+



                                                                                

Null values are not presented in the dataset

##### Checking duplicated data by id

In [16]:
duplicate_count = result.groupBy("id").count()
duplications = duplicate_count.filter(duplicate_count["count"] > 1)

In [17]:
duplications.show()

[Stage 7:>                                                        (0 + 10) / 10]

+---+-----+
| id|count|
+---+-----+
+---+-----+



                                                                                

There are no duplications by id in the dataset

# Challenge Solution

For each question, I will utilize memory usage and execution time measurements.

Memory_profiler: I will analyze each step of my code to understand possible refinements of memory usage during each stage of my data processing. To achieve this, I'll use the memory_profiler library to profile memory consumption at various points in my code. This will provide insights into memory-intensive operations that can be optimized.

Time: To measure the execution time of my data processing, I'll use the datetime differences approach. In the Jupyter notebook, I will record the start and end times before and after the code execution and calculate the time difference. This will help me assess the performance of my code and identify areas that may benefit from time optimization.

##### Runtime memory customization for Spark


spark.executor.memory: It controls how much memory each worker (executor) in a Spark cluster can use for processing tasks.

spark.driver.memory: It specifies how much memory the main program (driver) of your Spark application can use for its operations.

These settings are crucial for optimizing memory usage and performance, and you can adjust them based on your specific application needs and available cluster resources. Be cautious not to allocate too much or too little memory, as it can impact performance.

```

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("FarmersProtestTweets") \
    .config("spark.executor.memory", "MEMORY ALOCATED") \
    .config("spark.driver.memory", "MEMORY ALOCATED") \
    .getOrCreate()
```


In [7]:
# Datetime to analyze execution time
from datetime import datetime

In [10]:
STAGING_DATA_PATH = "../data/staging/farmers-protest-tweets-staging.csv"

In [2]:
from pyspark.sql import SparkSession

Creating default SparkSession

In [12]:
sparkSession_default = SparkSession.builder\
    .appName("FarmersProtestTweets")\
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/03 17:53:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Creating custom memory allocation SparkSession

In [3]:
sparkSession_optimization = SparkSession.builder \
    .appName("FarmersProtestTweetsOptmization") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/03 18:19:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Q1 Memory

In [14]:
from q1_memory import q1_memory

In [20]:
start_time = datetime.now()
result_q1_memory = q1_memory(STAGING_DATA_PATH)
end_time = datetime.now()


23/11/03 17:56:55 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Filename: /Users/rtakeshi/Documents/Projetos/challenge-DE/src/q1_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    17    105.8 MiB    105.8 MiB           1   @profile
    18                                         def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    19                                             
    20    105.8 MiB      0.0 MiB           1       spark = SparkSession.builder.appName("FarmersProtestTweets").getOrCreate()
    21                                             
    22    105.8 MiB      0.0 MiB           1       df = spark.read.option('delimiter', '~').option('header', True).option('multiline', True).schema(STAGING_SCHEMA).csv(file_path)
    23                                             
    24                                             #Top 10 dates with more content
    25    105.8 MiB      0.0 MiB           1       date_counts = df.groupBy('date').agg(count('content').alias('date_count'))
    26    105.8 MiB    

In [16]:
print('Duration: {}'.format(end_time - start_time))

Duration: 0:00:10.306601


## Q1 Time

In [21]:
from q1_time import q1_time

In [22]:
start_time = datetime.now()
result_q1_time = q1_time(STAGING_DATA_PATH)
end_time = datetime.now()



23/11/03 17:57:07 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
23/11/03 17:57:07 WARN CacheManager: Asked to cache already cached data.


Filename: /Users/rtakeshi/Documents/Projetos/challenge-DE/src/q1_time.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    18    105.8 MiB    105.8 MiB           1   @profile
    19                                         def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    20                                                 
    21    105.8 MiB      0.0 MiB           1       spark = SparkSession.builder.appName("FarmersProtestTweetsOptmization").getOrCreate()
    22                                         
    23                                         
    24    105.8 MiB      0.0 MiB           1       df = spark.read.option('delimiter', '~').option('header', True).option('multiline', True).schema(STAGING_SCHEMA).csv(file_path)
    25    105.8 MiB      0.0 MiB           1       df.persist(StorageLevel.MEMORY_AND_DISK)
    26                                         
    27                                             #Top 10 dates with more content
    28

In [23]:
print('Duration: {}'.format(end_time - start_time))

Duration: 0:00:01.092429


### Q1 Conclusions

1. The memory profiler cannot access JVM memory usage. Consequently, the memory usage will appear stable in the report.
2. The data volume of the staging data (27 MB) is inadequate for a comprehensive performance evaluation in PySpark. To address this, I will create two additional mocked datasets with larger data volumes.

By running my functions with a 2.2 GB data volume, it was still not possible to perform a thorough bottleneck analysis for my code. The following notebook steps will be analyzed using a 22 GB mocked dataset.

## Analysis and Time optmization in PySpark for Q1

For this analysis, Q1 was selected for a detailed examination in the Spark Web UI.

Steps to be followed:

1. Using data/test/mock_volume_data.py, I will generate a 20 GB dataset.
2. Using the q1_memory function, I will analyze potential bottlenecks in processing via the Spark Web UI. If possible, I will implement caching in q1_time to improve processing time.
3. Subsequently, I will compare differences in Spark job steps regarding memory usage and execution time.


In [5]:
LARGER_DATA_DIR = "../data/test/test_volume_data20gb.csv"

#### Running q1_memory function with large dataset

In [25]:
result = q1_memory(LARGER_DATA_DIR)

23/11/03 17:57:39 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
[Stage 69:>                                                         (0 + 1) / 1]

Filename: /Users/rtakeshi/Documents/Projetos/challenge-DE/src/q1_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    17    105.8 MiB    105.8 MiB           1   @profile
    18                                         def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    19                                             
    20    105.8 MiB      0.0 MiB           1       spark = SparkSession.builder.appName("FarmersProtestTweets").getOrCreate()
    21                                             
    22    105.8 MiB      0.0 MiB           1       df = spark.read.option('delimiter', '~').option('header', True).option('multiline', True).schema(STAGING_SCHEMA).csv(file_path)
    23                                             
    24                                             #Top 10 dates with more content
    25    105.8 MiB      0.0 MiB           1       date_counts = df.groupBy('date').agg(count('content').alias('date_count'))
    26    105.8 MiB    

                                                                                

As mentioned earlier, the memory profiler is unable to access memory usage data for Spark jobs. To analyze the execution of my function, I accessed the Spark Web UI and retrieved information about my function's execution.

|  Spark_job_id |  duration  | peak Execution Memory  | Peak JVM Memory  | Job step description                   |
|---------------|------------|------------------------|------------------|----------------------------------------|
|             0 |  3.6 min   |       256kb            |     289 mb       |  data read and load                    |     
|             1 |  62 ms     |       2.2 mb           |     162 mb       |  count by date                         |     
|             2 |  3.8 min   |       4.3 mb           |     286 mb       |  filtering original df                 |           
|             3 |  0.2 s     |       5.5 mb           |     0 mb         |  count by user in filtered_df          |        
|             4 |  0.2 s     |       2.1 mb           |     0 mb         |  create analytical window              |     
|             5 |  41 ms     |       2.1 mb           |     0 mb         |  filtering users by row_number rank    |   
|             6 |  28 ms     |       2.1 mb           |     0 mb         |  classify to generate result           |

Analyzing the stages in a Spark job is crucial for understanding performance and resource usage. The provided table offers valuable insights into job execution. We observe that stages 0 and 2 stand out in terms of duration, indicating that data reading and filtering of the original DataFrame are more time-consuming operations. The variation in peak execution memory and peak JVM memory underscores the complexity of operations across different stages. Step descriptions provide detailed information on what each stage accomplishes. Runtime varies widely, highlighting the diversity of operations. This analysis is essential for optimizing Spark job performance and allocating resources effectively."


### Preliminary conclusions for q1_memory execution


By using a non-partitioned CSV file as the data source, it was not possible to parallelize the reading process in PySpark, as evidenced in Spark job 0. The large, unpartitioned file led to a single-threaded reading process, resulting in a longer read time.

The filtering step to select only the necessary data significantly improved the subsequent steps by reducing memory usage. This optimization allowed for more efficient processing and helped to overcome the limitations posed by the initial non-partitioned file. 



##### Future improvements

In real-world scenarios, if a file needs to be read more than once, it's a good practice to use a more performant file format like Parquet. 

Another technique to consider for such scenarios is repartitioning your data to improve the distribution of the reading process.



#### Running q1_time with larger dataset

In [4]:
from q1_time import q1_time

The chosen optimization technique for q1_time was the creation of a persistence block in Memory and Disk. In the first execution, the optimized function will persist the data after the initial read, allowing for faster future accesses to this dataset.

In practice, the concept of a "slow start" is implemented. The first execution of the optimization function will be more resource-intensive, but subsequent accesses to it will be faster.

As a result, the accesses made by Q2 and Q3 to the cached dataset will be faster.

To perform this case analysis, I will execute the q1_time function repeatedly to apply test and control analyses.

In [6]:
result = q1_time(LARGER_DATA_DIR)

23/11/03 18:22:48 WARN MemoryStore: Not enough space to cache rdd_3_0 in memory! (computed 3.7 GiB so far)
23/11/03 18:22:48 WARN BlockManager: Persisting block rdd_3_0 to disk instead.
23/11/03 18:28:12 WARN BlockManager: Block rdd_3_0 already exists on this machine; not re-adding it
                                                                                

Filename: /Users/rtakeshi/Documents/Projetos/challenge-DE/src/q1_time.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    18     98.1 MiB     98.1 MiB           1   @profile
    19                                         def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    20                                                 
    21     98.1 MiB      0.0 MiB           1       spark = SparkSession.builder.appName("FarmersProtestTweetsOptmization").getOrCreate()
    22                                         
    23                                         
    24     98.1 MiB      0.0 MiB           1       df = spark.read.option('delimiter', '~').option('header', True).option('multiline', True).schema(STAGING_SCHEMA).csv(file_path)
    25     98.1 MiB      0.0 MiB           1       df.persist(StorageLevel.MEMORY_AND_DISK)
    26                                         
    27                                             #Top 10 dates with more content
    28

|  Spark_job_id |  duration  | peak Execution Memory  | Peak JVM Memory  | Job step description                   |
|---------------|------------|------------------------|------------------|----------------------------------------|
|             0 |  7.5 min   |       0 mb             |     7 gb         |  data read and load and caching        |     
|             1 |  0.2 ms    |       2.2 mb           |     162 mb       |  count by date                         |     
|             2 |  32 s      |       256 kb           |     5.2 gb       |  filtering original df                 |   

Other steps will be ommited for this analysis

Total execution time 8 minutes and 48 seconds


Upon reviewing this table, it's clear that the data read and load operations, including caching, posed a significant bottleneck during the initial execution, leading to a substantial increase in memory usage.

However, once this bottleneck was addressed, the operation that previously had a notably long execution time experienced a remarkable 90% reduction in its execution time.


In [7]:
result = q1_time(LARGER_DATA_DIR)

23/11/03 18:29:43 WARN CacheManager: Asked to cache already cached data.
[Stage 21:>                                                         (0 + 1) / 1]

Filename: /Users/rtakeshi/Documents/Projetos/challenge-DE/src/q1_time.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    18     41.5 MiB     41.5 MiB           1   @profile
    19                                         def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    20                                                 
    21     41.9 MiB      0.4 MiB           1       spark = SparkSession.builder.appName("FarmersProtestTweetsOptmization").getOrCreate()
    22                                         
    23                                         
    24     42.5 MiB      0.6 MiB           1       df = spark.read.option('delimiter', '~').option('header', True).option('multiline', True).schema(STAGING_SCHEMA).csv(file_path)
    25     42.6 MiB      0.1 MiB           1       df.persist(StorageLevel.MEMORY_AND_DISK)
    26                                         
    27                                             #Top 10 dates with more content
    28

                                                                                

|  Spark_job_id |  duration  | peak Execution Memory  | Peak JVM Memory  | Job step description                   |
|---------------|------------|------------------------|------------------|----------------------------------------|
|            8 |  29 ms     |       16.2 m           |     0 b          |  data read and load and caching        |     
|             9 |  46 ms     |       2.2 mb           |     16 mb        |  count by date                         |     
|             10 |  37 s      |       256 kb           |     2 gb         |  filtering original df                 |   

Other steps will be ommited for this analysis

Total execution time 1 minute 7 seconds


During the second execution, the time required for data reading and the memory usage were completely eliminated due to the previous caching operation performed on the root dataframe. This caching step significantly improved the efficiency of accessing the data.

In the third step, despite the extensive filtering and data processing, the execution time remained consistent, highlighting the effectiveness and reliability of the optimization.

##### Closing my SparkSessions

In [28]:
sparkSession_default.stop()


In [8]:
sparkSession_optimization.stop()

### Analysis conclusion

In conclusion, this analysis highlights the effectiveness of caching techniques in large-scale data processing environments for optimizing execution time. Through the examination of execution steps, it became evident what the primary bottleneck to address for solving the challenge.

Assuming that the functions for questions Q2 and Q3 are executed within the same session, the results of the analysis can be extrapolated to the other two scenarios.

It's important to note that, for Q2 the analysis of this solution would be somewhat different due to the impact that UDFs can have on the overall performance of Spark applications.

## Solving Questions

In [5]:
STAGING_DATA_PATH = "../data/staging/farmers-protest-tweets-staging.csv"

#### Q1

q1. Las top 10 fechas donde hay más tweets. Mencionar el usuario (username) que más publicaciones tiene por cada uno de esos días.


In [None]:
result_q1 = q1_memory(STAGING_DATA_PATH)
print(result_q1)

#### Q2

q2. Los top 10 emojis más usados con su respectivo conteo.


In [6]:
from q2_memory import q2_memory

result_q2 = q2_memory(STAGING_DATA_PATH)
print(result_q2)


#### Q3

q3. El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@) que registra cada uno de ellos. 


In [None]:
from q3_memory import q3_memory

result_q3 = q3_memory(STAGING_DATA_PATH)
print(result_q3)