En este archivo puedes escribir lo que estimes conveniente. Te recomendamos detallar tu solución y todas las suposiciones que estás considerando. Aquí puedes ejecutar las funciones que definiste en los otros archivos de la carpeta src, medir el tiempo, memoria, etc.

## Reading Files

### Import Libraries

In [1]:
#! pip install -U memory_profiler



In [6]:
import json
import pandas as pd
import pyspark
import zipfile
from pyspark.sql import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import warnings
import emoji
import time
warnings.filterwarnings('ignore')
%load_ext memory_profiler

The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler


### Converting zip to gz

In [68]:
%%bash
cd ../data
#unzip -p ../data/farmers-tweets.json.zip | gzip > farmers-tweets.json.gz
unzip -p ../data/farmers-protest-tweets-2021-2-4.json.zip | gzip > farmers-protest-tweets-2021-2-4.json.gz

### Reading tweets as pyspark object

Initially the data is a sample of tweets `./data/farmers-tweets.json`, afterwards we'll load the complete json file

In [2]:
namef = '../data/farmers-protest-tweets-2021-2-4'
#namef = 'farmers-tweets'

file_path = namef+'.json.gz'
print(f"** File to use: {file_path}")

** File to use: ../data/farmers-protest-tweets-2021-2-4.json.gz


The first test is perform the data loading over a RDD sprk object.
`%memit` magic cell allow us measure memory.

*Note: The spark.read.json is running in the .gz file

In [86]:
%memit
#Creating SparkSession 
spark = SparkSession.builder.appName('readJson').getOrCreate()
#Read file as pyspark object()   
data = spark.read.json(file_path)
#transformation and renaming columns steps
dfcol = data.withColumn("created_at", data["date"].cast('date'))\
                                .withColumn("user_id", data["user.id"])\
                                .withColumn("username", data["user.username"])

df = dfcol.select(col("created_at"), col("user_id"), col("username")).groupBy("created_at", "username").count()
df.sort(df["count"].desc()).show(10) 

peak memory: 1198.84 MiB, increment: 0.22 MiB


[Stage 32:>                                                         (0 + 1) / 1]

+----------+---------------+-----+
|created_at|       username|count|
+----------+---------------+-----+
|2021-02-19|       Preetm91|  267|
|2021-02-18|neetuanjle_nitu|  195|
|2021-02-17| RaaJVinderkaur|  185|
|2021-02-13|MaanDee08215437|  178|
|2021-02-12|RanbirS00614606|  176|
|2021-02-21|     Surrypuria|  161|
|2021-02-18|  rebelpacifist|  153|
|2021-02-19|KaurDosanjh1979|  138|
|2021-02-23|     Surrypuria|  135|
|2021-02-15|         jot__b|  134|
+----------+---------------+-----+
only showing top 10 rows



                                                                                

### Reading tweets as pandas df

In [9]:
#file_path = "../data/farmers-tweets.json"
file_path = '../data/farmers-protest-tweets-2021-2-4.json'

In [78]:
#Read file with json.loads()   
data = [json.loads(line) for line in open(file_path, 'r')]
#convert to dataframe
df = pd.DataFrame(data)
#transformation and renaming columns steps
df["created_at"] = pd.to_datetime(df["date"]).dt.strftime('%Y-%m-%d')
#lambda functions to create new columns from user tag
df['username'] = df['user'].apply(lambda d: d['username'])
df['user_id'] = df['user'].apply(lambda d: d['id'])

dfres = df.groupby(['created_at','username'])['id'].count().reset_index(name="count").sort_values("count", ascending=False) 

print(dfres.head(10))

       created_at         username  count
35219  2021-02-19         Preetm91    267
33193  2021-02-18  neetuanjle_nitu    195
26577  2021-02-17   RaaJVinderkaur    185
7536   2021-02-13  MaanDee08215437    178
2740   2021-02-12  RanbirS00614606    176
42691  2021-02-21       Surrypuria    161
33396  2021-02-18    rebelpacifist    153
34733  2021-02-19  KaurDosanjh1979    138
48696  2021-02-23       Surrypuria    135
18540  2021-02-15           jot__b    134


## Memory Profiler

Track stdout memory consumption with `%mprun` magic command

### Spark function

In [82]:
from q1_memory import q1_memory
file_path = "../data/farmers-protest-tweets-2021-2-4.json.gz"
%mprun -f q1_memory q1_memory(file_path)

[Stage 28:>                                                         (0 + 1) / 1]

+----------+---------------+-----+
|created_at|       username|count|
+----------+---------------+-----+
|2021-02-19|       Preetm91|  267|
|2021-02-18|neetuanjle_nitu|  195|
|2021-02-17| RaaJVinderkaur|  185|
|2021-02-13|MaanDee08215437|  178|
|2021-02-12|RanbirS00614606|  176|
|2021-02-21|     Surrypuria|  161|
|2021-02-18|  rebelpacifist|  153|
|2021-02-19|KaurDosanjh1979|  138|
|2021-02-23|     Surrypuria|  135|
|2021-02-15|         jot__b|  134|
+----------+---------------+-----+
only showing top 10 rows




                                                                                

Filename: /Users/julianromero/Documents/Bench/ProcessTweets/src/q1_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     9    123.7 MiB    123.7 MiB           1   def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    10                                             
    11                                             #Creating SparkSession 
    12    123.7 MiB      0.0 MiB           1       spark = SparkSession.builder.appName('readJson').getOrCreate()
    13                                             #Read file as pyspark object()   
    14    123.7 MiB      0.0 MiB           1       data = spark.read.json(file_path)    
    15                                             #transformation and renaming columns steps
    16    123.7 MiB      0.0 MiB           2       dfcol = data.withColumn("created_at", data["date"].cast('date'))\
    17    123.7 MiB      0.0 MiB           1                                   .withColumn("user_id", data["user.id"])\
 

### Pandas Function

In [83]:
from q1_time import q1_time
file_path = "../data/farmers-protest-tweets-2021-2-4.json"
%mprun -f q1_time q1_time(file_path)

       created_at         username  count
35219  2021-02-19         Preetm91    267
33193  2021-02-18  neetuanjle_nitu    195
26577  2021-02-17   RaaJVinderkaur    185
7536   2021-02-13  MaanDee08215437    178
2740   2021-02-12  RanbirS00614606    176
42691  2021-02-21       Surrypuria    161
33396  2021-02-18    rebelpacifist    153
34733  2021-02-19  KaurDosanjh1979    138
48696  2021-02-23       Surrypuria    135
18540  2021-02-15           jot__b    134



Filename: /Users/julianromero/Documents/Bench/ProcessTweets/src/q1_time.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    10    123.9 MiB    123.9 MiB           1   def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    11                                             
    12                                             #Read file with json.loads()   
    13   2263.2 MiB -499104.4 MiB      117410       data = [json.loads(line) for line in open(file_path, 'r')]
    14                                             #convert to dataframe
    15   2349.5 MiB     86.3 MiB           1       df = pd.DataFrame(data)
    16                                             #transformation and renaming columns steps
    17   2359.0 MiB      9.6 MiB           1       df["created_at"] = pd.to_datetime(df["date"]).dt.strftime('%Y-%m-%d')
    18   2359.0 MiB -2460732.0 MiB      234815       df['username'] = df['user'].apply(lambda d: d['username'])
    19   2298.9 MiB    -60.2 M

### Generating memory usage reports with `mprof` and `plot`

### Spark Function

In [88]:
%%bash
echo mprof run /Users/julianromero/.local/share/virtualenvs/ProcessTweets-raqGfvtZ/bin/python /Users/julianromero/Documents/Bench/ProcessTweets/src/q1_memory.py

mprof run /Users/julianromero/.local/share/virtualenvs/ProcessTweets-raqGfvtZ/bin/python /Users/julianromero/Documents/Bench/ProcessTweets/src/q1_memory.py


![Screenshot1.png](attachment:Screenshot1.png)

![memorytime_usage_pyspark.png](attachment:memorytime_usage_pyspark.png)

### Pandas Function

In [89]:
%%bash
echo mprof run /Users/julianromero/.local/share/virtualenvs/ProcessTweets-raqGfvtZ/bin/python /Users/julianromero/Documents/Bench/ProcessTweets/src/q1_time.py

mprof run /Users/julianromero/.local/share/virtualenvs/ProcessTweets-raqGfvtZ/bin/python /Users/julianromero/Documents/Bench/ProcessTweets/src/q1_time.py


![Screenshot2.png](attachment:Screenshot2.png)

![memorytime_usage_pandas.png](attachment:memorytime_usage_pandas.png)

### Solving 1st Challenge, tuple stdout

Top 10 dates with more tweets. Mention the username who has more publications for those dates:

Keep the next functions:
```python
def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:

def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
```

In [5]:
#Creating SparkSession 
spark = SparkSession.builder.appName('readJson').getOrCreate()
#Read file as pyspark object()   
data = spark.read.json(file_path)
#transformation and renaming columns steps
dfcol = data.withColumn("created_at", data["date"].cast('date'))\
                                .withColumn("user_id", data["user.id"])\
                                .withColumn("username", data["user.username"])

dfaux = dfcol.select(col("created_at"), col("user_id"), col("username")).groupBy("created_at", "username").count()
#df.sort(df["count"].desc()).show(10)   
    
#Convert to pandas df sorted   
df = dfaux.toPandas().sort_values("count", ascending=False)    

#Printing tuples as datetime.date format
print(list(df[['created_at','username']].head(10).itertuples(index=False, name=None)))   

                                                                                

[(datetime.date(2021, 2, 19), 'Preetm91'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 21), 'Surrypuria'), (datetime.date(2021, 2, 18), 'rebelpacifist'), (datetime.date(2021, 2, 19), 'KaurDosanjh1979'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 15), 'jot__b')]


Calling function def `q1_memory(file_path: str)`

In [5]:
from q1_memory import q1_memory
file_path = "../data/farmers-protest-tweets-2021-2-4.json.gz"
%mprun -f q1_memory q1_memory(file_path)

                                                                                

[(datetime.date(2021, 2, 19), 'Preetm91'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 21), 'Surrypuria'), (datetime.date(2021, 2, 18), 'rebelpacifist'), (datetime.date(2021, 2, 19), 'KaurDosanjh1979'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 15), 'jot__b')]



Filename: /Users/julianromero/Documents/Bench/ProcessTweets/src/q1_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     9   1357.3 MiB   1357.3 MiB           1   def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    10                                             
    11                                             #Creating SparkSession 
    12   1357.3 MiB      0.0 MiB           1       spark = SparkSession.builder.appName('readJson').getOrCreate()
    13                                             #Read file as pyspark object()   
    14   1357.3 MiB     -0.0 MiB           1       data = spark.read.json(file_path)    
    15                                             #transformation and renaming columns steps
    16   1357.3 MiB      0.0 MiB           2       dfcol = data.withColumn("created_at", data["date"].cast('date'))\
    17   1357.3 MiB      0.0 MiB           1                                   .withColumn("user_id", data["user.id"])\
 

Calling function def `q1_time(file_path: str)`

In [10]:
from q1_time import q1_time
file_path = "../data/farmers-protest-tweets-2021-2-4.json"
%mprun -f q1_time q1_time(file_path)

[('2021-02-19', 'Preetm91'), ('2021-02-18', 'neetuanjle_nitu'), ('2021-02-17', 'RaaJVinderkaur'), ('2021-02-13', 'MaanDee08215437'), ('2021-02-12', 'RanbirS00614606'), ('2021-02-21', 'Surrypuria'), ('2021-02-18', 'rebelpacifist'), ('2021-02-19', 'KaurDosanjh1979'), ('2021-02-23', 'Surrypuria'), ('2021-02-15', 'jot__b')]



Filename: /Users/julianromero/Documents/Bench/ProcessTweets/src/q1_time.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    10    141.8 MiB    141.8 MiB           1   def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    11                                             
    12                                             #Read file with json.loads()   
    13   1182.6 MiB   -314.4 MiB      117410       data = [json.loads(line) for line in open(file_path, 'r')]
    14                                             #convert to dataframe
    15   1267.2 MiB     84.6 MiB           1       df = pd.DataFrame(data)
    16                                             #transformation and renaming columns steps
    17   1279.2 MiB     12.0 MiB           1       df["created_at"] = pd.to_datetime(df["date"]).dt.strftime('%Y-%m-%d')
    18   1279.2 MiB      0.0 MiB      234815       df['username'] = df['user'].apply(lambda d: d['username'])
    19   1279.2 MiB      0.0 MiB 

### Solving 2nd Challenge

Top 10 most used emojis with their count.

Keep the next functions:
```python
def q2_time(file_path: str) -> List[Tuple[str, int]]:

def q2_memory(file_path: str) -> List[Tuple[str, int]]:    
```

### EDA

Exploring twitter posts, I found a relation between the `content` and `user.id` what causes a duplicate content field. This behavior seems to be realated to the same tweet, as you can observe.

For Instance: Two existing records represent the same tweet posted by the same `user.id` But they have different attributes for `date` and `id`.
Trying to find relevant differences the `quotedTweet` tag has different `conversationId`:

![json1.png](attachment:json1.png)

![json2.png](attachment:json2.png)

The results are affected in all questions by this nature.

The count emoji show different analysis, droping duplicates and leaving the entire data.
Acoording to the information purposes and knowing that we´re treating with different `id` and `conversationId`, analysis will preserve all the tweets.

### Analysis Droping Duplicates

In [12]:
#file_path = "../data/farmers-tweets.json"
file_path = '../data/farmers-protest-tweets-2021-2-4.json'

In [13]:
##Read file with json.loads()   
data = [json.loads(line) for line in open(file_path, 'r')]
#convert to dataframe
df = pd.DataFrame(data)
#transformation and renaming columns steps    
df['user_id'] = df['user'].apply(lambda d: d['id'])
#Duplicate tweets by user_id and content findings
print(df.shape)
df=df.drop_duplicates(subset=['content','user_id'])
print(df.shape)
#dfres = df.groupby(['content','user_id'])['id'].count().reset_index(name="count").sort_values("count", ascending=False)      
#print(dfres.query('count > 1').head(10))  

dfres = df[['content','id']]

text = dfres['content'].str.cat(sep='\n')    
out = (pd.DataFrame(emoji.emoji_list(text)).value_counts('emoji')
        .rename_axis('Smiley').rename('Count').reset_index()
        .assign(Type=lambda x: x['Smiley'].apply(emoji.demojize)))

print(list(out[['Smiley','Count']].head(10).itertuples(index=False, name=None)))

(117407, 22)
(115624, 22)
[('🙏', 4991), ('😂', 3068), ('🚜', 2899), ('🌾', 2164), ('🇮🇳', 2085), ('🤣', 1660), ('✊', 1633), ('❤️', 1378), ('🙏🏻', 1299), ('💚', 1036)]


### Analysis Whole Data


In [14]:
##Read file with json.loads()   
data = [json.loads(line) for line in open(file_path, 'r')]
#convert to dataframe
df = pd.DataFrame(data)
#transformation and renaming columns steps    
#Duplicate tweets by user_id and content findings
print(df.shape)
df=df.drop_duplicates(subset=['content','id'])
print(df.shape)
#dfres = df.groupby(['content','user_id'])['id'].count().reset_index(name="count").sort_values("count", ascending=False)      
#print(dfres.query('count > 1').head(10))  

dfres = df[['content','id']]

text = dfres['content'].str.cat(sep='\n')    
out = (pd.DataFrame(emoji.emoji_list(text)).value_counts('emoji')
        .rename_axis('Smiley').rename('Count').reset_index()
        .assign(Type=lambda x: x['Smiley'].apply(emoji.demojize)))

print(list(out[['Smiley','Count']].head(10).itertuples(index=False, name=None)))

(117407, 21)
(117407, 21)
[('🙏', 5049), ('😂', 3072), ('🚜', 2972), ('🌾', 2182), ('🇮🇳', 2086), ('🤣', 1668), ('✊', 1651), ('❤️', 1382), ('🙏🏻', 1317), ('💚', 1040)]


### `q2_memory` Improving memory

In [4]:
from q2_memory import q2_memory
start = time.time()
file_path = "../data/farmers-protest-tweets-2021-2-4.json.gz"
%mprun -f q2_memory q2_memory(file_path)
end = time.time()
timer = end - start
print(f"Time elapsed: {timer:.2f} seconds")

                                                                                

[('🙏', 5049), ('😂', 3072), ('🚜', 2972), ('🌾', 2182), ('🇮🇳', 2086), ('🤣', 1668), ('✊', 1651), ('❤️', 1382), ('🙏🏻', 1317), ('💚', 1040)]

Time elapsed: 69.50 seconds


Filename: /Users/julianromero/Documents/Bench/ProcessTweets/src/q2_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    11   1279.8 MiB   1279.8 MiB           1   def q2_memory(file_path: str) -> List[Tuple[str, int]]:
    12                                             #Creating SparkSession 
    13   1279.8 MiB      0.0 MiB           1       spark = SparkSession.builder.appName('readJson').getOrCreate()
    14                                             #Read file as pyspark object()   
    15   1279.9 MiB      0.0 MiB           1       data = spark.read.json(file_path)            
    16                                             #Selection only the columns to process to enhance time and usage memory consumption
    17   1279.9 MiB      0.0 MiB           1       dfaux = data.select(col("content"), col("id"))
    18   1290.3 MiB     10.4 MiB           1       df = dfaux.toPandas() 
    19                                             #Creating string for each co

In [29]:
#!sudo py-spy top -- python q2_memory.py

### `q2_time` Improving time

In [30]:
from q2_time import q2_time
start = time.time()
file_path = "../data/farmers-protest-tweets-2021-2-4.json"
%mprun -f q2_time q2_time(file_path)
end = time.time()
timer = end - start
print(f"Time elapsed: {timer:.2f} seconds")

(117407, 21)
[('🙏', 5049), ('😂', 3072), ('🚜', 2972), ('🌾', 2182), ('🇮🇳', 2086), ('🤣', 1668), ('✊', 1651), ('❤️', 1382), ('🙏🏻', 1317), ('💚', 1040)]

Time elapsed: 73.39 seconds


Filename: /Users/julianromero/Documents/Bench/ProcessTweets/src/q2_time.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    11   1210.4 MiB   1210.4 MiB           1   def q2_time(file_path: str) -> List[Tuple[str, int]]:
    12                                             
    13                                             ##Read file with json.loads()   
    14   2272.3 MiB -1495704.4 MiB      117410       data = [json.loads(line) for line in open(file_path, 'r')]
    15                                             #convert to dataframe
    16   2362.9 MiB     90.6 MiB           1       df = pd.DataFrame(data)
    17                                             #Selection only the columns to process to enhance time and usage memory consumption
    18                                             dfres = df[['content','id']]
    19                                             #Creating string for each content
    20                                             text = dfres['

### Solving 3rd Challenge

Top 10 users with more retweets (@)

Keep the next functions:
```python
def q3_time(file_path: str) -> List[Tuple[str, int]]:

def q3_memory(file_path: str) -> List[Tuple[str, int]]:
```

### `q3_memory` Improving memory

In [7]:
from q3_memory import q3_memory
start = time.time()
file_path = "../data/farmers-protest-tweets-2021-2-4.json.gz"
%mprun -f q3_memory q3_memory(file_path)
end = time.time()
timer = end - start
print(f"Time elapsed: {timer:.2f} seconds")

                                                                                

[('narendramodi', 2261), ('Kisanektamorcha', 1836), ('RakeshTikaitBKU', 1639), ('PMOIndia', 1422), ('RahulGandhi', 1125), ('GretaThunberg', 1046), ('RaviSinghKA', 1015), ('rihanna', 972), ('UNHumanRights', 962), ('meenaharris', 925)]

Time elapsed: 4.52 seconds


Filename: /Users/julianromero/Documents/Bench/ProcessTweets/src/q3_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     9    465.7 MiB    465.7 MiB           1   def q3_memory(file_path: str) -> List[Tuple[str, int]]:
    10                                             
    11                                             #Creating SparkSession 
    12    465.7 MiB      0.0 MiB           1       spark = SparkSession.builder.appName('readJson').getOrCreate()
    13                                             #Read file as pyspark object()   
    14    465.7 MiB      0.0 MiB           1       data = spark.read.json(file_path)            
    15                                             #Selection only the columns to process to enhance time and usage memory consumption
    16    465.7 MiB      0.0 MiB           1       dfaux = data.select(col("content"))
    17    468.9 MiB      3.2 MiB           1       dfres = dfaux.toPandas() 
    18    468.9 MiB      0.0 MiB   

### `q3_time` Improving time

In [3]:
from q3_time import q3_time
start = time.time()
file_path = "../data/farmers-protest-tweets-2021-2-4.json"
%mprun -f q3_time q3_time(file_path)
end = time.time()
timer = end - start
print(f"Time elapsed: {timer:.2f} seconds")

[('narendramodi', 2261), ('Kisanektamorcha', 1836), ('RakeshTikaitBKU', 1639), ('PMOIndia', 1422), ('RahulGandhi', 1125), ('GretaThunberg', 1046), ('RaviSinghKA', 1015), ('rihanna', 972), ('UNHumanRights', 962), ('meenaharris', 925)]

Time elapsed: 5.77 seconds


Filename: /Users/julianromero/Documents/Bench/ProcessTweets/src/q3_time.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    10    233.0 MiB    233.0 MiB           1   def q3_time(file_path: str) -> List[Tuple[str, int]]:
    11                                             
    12                                             ##Read file with json.loads()   
    13   1189.5 MiB -66426.5 MiB      117410       data = [json.loads(line) for line in open(file_path, 'r')]
    14                                             #convert to dataframe
    15   1269.4 MiB     79.9 MiB           1       dfaux = pd.DataFrame(data)
    16                                             #Selection only the columns to process to enhance time and usage memory consumption
    17   1269.6 MiB      0.2 MiB           1       dfres = dfaux[['content']]
    18                                             #Creating string for each content    
    19   1269.6 MiB      0.0 MiB           1       """ Extract