In [2]:
import pyspark
import pyspark.sql.functions as sf
from pyspark.sql import Window
from typing import Literal
import os
import pandas as pd

In [13]:
from pyspark.sql import DataFrame

def pipe(self, func, *args, **kwargs):
    return func(self, *args, **kwargs)

DataFrame.pipe = pipe 

In [3]:
data_dir = 'data/Tweets/'
master = 'local[2]'

spark = (
    pyspark.sql.SparkSession.builder
    .master(master) 
    .getOrCreate()
)
spark

22/09/09 15:46:09 WARN Utils: Your hostname, Admins-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 172.17.4.16 instead (on interface en0)
22/09/09 15:46:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/09 15:46:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
def load_json_data(dir: str) -> pyspark.sql.dataframe.DataFrame:
    data: pyspark.sql.dataframe.DataFrame = None
    file_list = []
    for file in os.listdir(dir):
        file_list.append(os.path.join(dir, file))
    return spark.read.json(file_list)
        
nested_df = load_json_data(data_dir)

                                                                                

22/09/09 15:46:30 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [5]:
def filter_cols(spark_df: pyspark.sql.dataframe.DataFrame, 
                columns: list) -> pyspark.sql.dataframe.DataFrame:
    return (
        spark_df
        .select(columns)
    )

filtered_df = filter_cols(nested_df, ["created_at", "entities.hashtags.text"])
filtered_df.limit(10).toPandas()

Unnamed: 0,created_at,text
0,Thu Feb 18 14:49:30 +0000 2021,[]
1,Thu Feb 18 14:49:30 +0000 2021,[]
2,Thu Feb 18 14:49:30 +0000 2021,[]
3,Thu Feb 18 14:49:30 +0000 2021,[]
4,Thu Feb 18 14:49:30 +0000 2021,[]
5,Thu Feb 18 14:49:30 +0000 2021,[]
6,Thu Feb 18 14:49:30 +0000 2021,[]
7,Thu Feb 18 14:49:30 +0000 2021,[]
8,Thu Feb 18 14:49:30 +0000 2021,[]
9,Thu Feb 18 14:49:30 +0000 2021,"[missingperson, helpfindrat]"


In [6]:
def rename_cols(spark_df: pyspark.sql.dataframe.DataFrame,
                mapping:dict) -> pyspark.sql.dataframe.DataFrame:
    for old_name, new_name in mapping.items():
        spark_df = spark_df.withColumnRenamed(old_name, new_name)
    return spark_df

renamed_df = rename_cols(filtered_df,
                         {'text': 'hashtags'})
renamed_df.limit(5).toPandas()


Unnamed: 0,created_at,hashtags
0,Thu Feb 18 14:49:30 +0000 2021,[]
1,Thu Feb 18 14:49:30 +0000 2021,[]
2,Thu Feb 18 14:49:30 +0000 2021,[]
3,Thu Feb 18 14:49:30 +0000 2021,[]
4,Thu Feb 18 14:49:30 +0000 2021,[]


In [7]:
def convert_to_datecol(spark_df: pyspark.sql.dataframe.DataFrame,
                   col: str,
                   format: str) -> pyspark.sql.dataframe.DataFrame:
    spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
    df_datecol_converted = (
        spark_df
        .withColumn(col,
                    sf.to_timestamp(col,
                               format=format)
        )
    )
    return df_datecol_converted

preproccesed_df = convert_to_datecol(renamed_df, 'created_at', 'EEE MMM dd HH:mm:ss ZZZZZ yyyy')
preproccesed_df.limit(10).toPandas()
    

Unnamed: 0,created_at,hashtags
0,2021-02-18 15:49:30,[]
1,2021-02-18 15:49:30,[]
2,2021-02-18 15:49:30,[]
3,2021-02-18 15:49:30,[]
4,2021-02-18 15:49:30,[]
5,2021-02-18 15:49:30,[]
6,2021-02-18 15:49:30,[]
7,2021-02-18 15:49:30,[]
8,2021-02-18 15:49:30,[]
9,2021-02-18 15:49:30,"[missingperson, helpfindrat]"


In [38]:
piped_df = (
    preproccesed_df
    .pipe(truncate_date, 'hour', 'created_at')
    .pipe(explode_group_count, 'hashtags', ['hour', 'hashtags'])
    .pipe(add_change_per_time_unit, 'count', 'hashtags', 'hour', 1).cache()
    .pipe(set_abs_thresholds_for_rel_change, 'count', 'n_previous_value', 50, 5)
)
piped_df.limit(10).toPandas()

22/09/09 17:04:27 WARN CacheManager: Asked to cache already cached data.


Unnamed: 0,hour,hashtags,count,n_previous_value,rel_change
0,2020-09-18 14:00:00,105LIVE,1,,
1,2020-09-18 14:00:00,1970s,1,,
2,2020-09-17 15:00:00,1DDrive,2,,
3,2020-09-17 16:00:00,1DDrive,1,2.0,
4,2020-09-18 11:00:00,1DDrive,2,1.0,
5,2020-09-18 14:00:00,1DDrive,5,2.0,
6,2020-09-18 15:00:00,1DDrive,1,5.0,
7,2020-09-18 16:00:00,1DDrive,1,1.0,
8,2021-02-19 10:00:00,1DDrive,1,1.0,
9,2021-02-19 14:00:00,1DDrive,2,1.0,


In [33]:
rolling_window_2 = Window.partitionBy('hour')
(
    piped_df
    .withColumn('max_rel_change',
                sf.max('rel_change').over(rolling_window_2))
    .filter(
        sf.col('rel_change') == sf.col('max_rel_change')
    )
    .sort(['hour'], ascending=[True]) 
).toPandas()

                                                                                

Unnamed: 0,hour,hashtags,count,n_previous_value,rel_change,max_rel_change
0,2020-09-17 16:00:00,UniteForIOKandPalestine,120,23,4.217391,4.217391
1,2020-09-17 17:00:00,이달의소녀,178,7,24.428571,24.428571
2,2020-09-18 09:00:00,Valimai,100,35,1.857143,1.857143
3,2020-09-18 10:00:00,काला_कानून_संविदा_वापस_लो,67,6,10.166667,10.166667
4,2020-09-18 11:00:00,TREASURE_사랑해_ILoveYou,166,9,17.444444,17.444444
5,2020-09-18 13:00:00,ILANDTheFinale,272,6,44.333333,44.333333
6,2020-09-18 14:00:00,AOTD41,68,5,12.6,12.6
7,2020-09-18 15:00:00,Dynamite,219,36,5.083333,5.083333
8,2020-09-18 16:00:00,BTS,1831,114,15.061404,15.061404
9,2021-02-18 16:00:00,NCT127,872,14,61.285714,61.285714


In [25]:
rolling_window = Window.partitionBy('hashtag').orderBy('hour')
rolling_window_2 = Window.partitionBy('hour')


def truncate_date(spark_df: pyspark.sql.dataframe.DataFrame,
                  time_unit: str,
                  col: str) -> pyspark.sql.dataframe.DataFrame:
    return (
        spark_df
        .withColumn(time_unit,
                    sf.date_trunc(time_unit, sf.col(col)))
    )
    
def explode_group_count(spark_df: pyspark.sql.dataframe.DataFrame,
                        explode_col: str,
                        groupby_cols: list) -> pyspark.sql.dataframe.DataFrame:
    
    return (
        spark_df
        .withColumn(explode_col, sf.explode(explode_col))
        .groupBy(groupby_cols)
        .count()
    )
    
def add_change_per_time_unit(spark_df: pyspark.sql.dataframe.DataFrame,
                                       col_for_stats: str,
                                       partition_col: str,
                                       time_col: str,
                                       window_size: int = 1) -> pyspark.sql.dataframe.DataFrame:
    
    roll_window = Window.partitionBy(partition_col).orderBy(time_col)
    
    return (
        spark_df
        # add value of nth previous row
        .withColumn('n_previous_value',
                       sf.lag(col_for_stats, window_size).over(roll_window))
        # add relative change
        .withColumn('rel_change',
                    (sf.col(col_for_stats) - sf.col('n_previous_value')) / 
                     sf.col('n_previous_value'))
    )

def set_abs_thresholds_for_rel_change(spark_df: pyspark.sql.dataframe.DataFrame,
                                        col:str,
                                        prev_value_col: str,
                                        min_abs_value: int = 1,
                                        min_prev_abs_value: int = 1,
                                        rel_change_col: str = 'rel_change') -> pyspark.sql.dataframe.DataFrame:
        return (
            spark_df
            # change rel_change to NaN if abs value of that row is < min_abs_value
            .withColumn(rel_change_col,
                   sf.when(sf.col(col) >= min_abs_value, sf.col(rel_change_col)))
            # change rel_change to NaN if abs value of previous nth row is < min_abs_nth_previous
            .withColumn(rel_change_col,
                        sf.when(sf.col(prev_value_col) >= min_prev_abs_value, sf.col(rel_change_col)))
        )

# (
#     preproccesed_df
#     .withColumn('hour', sf.date_trunc('hour', sf.col('created_at')))
#     .withColumn('hashtag', sf.explode(sf.col('hashtags')))
#     .groupBy('hour', 'hashtag') 
#     .count()
#     .withColumn('previous_value',
#                 )
#     .withColumn('rel_change',
#                 (sf.col('count') - sf.lag('count',1).over(rolling_window))/sf.lag('count',1).over(rolling_window))
    
#     .withColumn('rel_change',
#                sf.when(sf.col('count') < 50, sf.lit(None))
#                .otherwise(sf.col('rel_change')))
    
#      .withColumn('rel_change',
#                sf.when(sf.lag('count',1).over(rolling_window) < 5, sf.lit(None))
#                .otherwise(sf.col('rel_change')))

#     .withColumn('max_rel_change',
#                 sf.max('rel_change').over(rolling_window_2))
#     .filter(
#         sf.col('rel_change') == sf.col('max_rel_change')
#     )
#     .sort(['hour'], ascending=[True]) 
# ).toPandas()

In [71]:
type((
    nested_df
    .select(["created_at", "text"])
    
    # .select("extended_tweet")
    # .filter(
    #     sf.col("extended_tweet") == sf.col("extended_tweet")
    # )
).limit(10))

pyspark.sql.dataframe.DataFrame

In [None]:
nested_df.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- additional_media_info: struct (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- embeddable: boolean (nullable = true)
 |    |    |    |    |-- monetizable: bo