In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config('spark.driver.maxResultSize', '18g').getOrCreate()

In [0]:
from pyspark.sql.functions import lit

In [0]:
import pyspark.sql.functions as F

In [0]:
def handler(sparkdf):
    df = sparkdf.toPandas()
    filtered_df = df[df["title"].str.contains("ukraine|russia|russian|ukraina|putin|kyiv|moscow|invasion|vladimir")==True]
    #filtered_df['sub_time']=subtime
    return spark.createDataFrame(filtered_df) 

In [0]:
def read_dataset(year_range = [2015, 2016, 2017], month_range = range(1, 13), comments_or_submissions = "submissions"):
    if comments_or_submissions == "comments":
        link = "comments/RC_"
    elif comments_or_submissions == "submissions":
        link = "submissions/RS_"
    else:
        link = "non-exist"
    result = None
    for year in year_range:
        for month in month_range:
            month_literal = ""
            if month < 10:
                month_literal = "0" + str(month)
            else:
                month_literal = str(month)
            data_path = "/mnt/lsde/datasets/reddit/" + link + str(year) + "-" + month_literal + ".json.bz2"
            subtime=str(year) + "-" + month_literal
            substrings ="ukraine|russia|russian|ukraina|putin|kyiv|moscow|invasion|vladimir"
            
            if result is None:
                print("handling"+str(year) + "-" + month_literal )
                #result = handler(spark.read.load(data_path,format="json").filter("author!='[deleted]'").filter("title!='[deleted]'").select("author","title","subreddit","subreddit_id","id").withColumn("subtime",lit(subtime)))
                result =spark.read.option("samplingRatio",0.000001).load(data_path,format="json").filter("author!='[deleted]'").filter("title!='[deleted]'").select("author","title","subreddit","subreddit_id","id")
                result = result.filter(F.col("title").rlike(substrings)).withColumn("subtime",lit(subtime))
            else:
                print("handling"+str(year) + "-" + month_literal )
                tmp = spark.read.option("samplingRatio",0.000001).load(data_path,format="json").filter("author!='[deleted]'").filter("title!='[deleted]'").select("author","title","subreddit","subreddit_id","id")
                tmp = tmp.filter(F.col("title").rlike(substrings)).withColumn("subtime",lit(subtime))
                result = result.union(tmp)
    result.write.mode("overwrite").format("parquet").save("/mnt/lsde/group05/"+ comments_or_submissions+"/"+str(year) +".parquet")

    return result

In [0]:
def read_dataset_withpath(year_range = [2015, 2016, 2017], month_range = range(1, 13), comments_or_submissions = "submissions",path=""):
    if comments_or_submissions == "comments":
        link = "comments/RC_"
    elif comments_or_submissions == "submissions":
        link = "submissions/RS_"
    else:
        link = "non-exist"
    result = None
    for year in year_range:
        for month in month_range:
            month_literal = ""
            if month < 10:
                month_literal = "0" + str(month)
            else:
                month_literal = str(month)
            data_path = "/mnt/lsde/datasets/reddit/" + link + str(year) + "-" + month_literal + ".json.bz2"
            subtime=str(year) + "-" + month_literal
            substrings ="ukraine|russia|russian|ukraina|putin|kyiv|moscow|invasion|vladimir"
            
            if result is None:
                print("handling"+str(year) + "-" + month_literal )
                #result = handler(spark.read.load(data_path,format="json").filter("author!='[deleted]'").filter("title!='[deleted]'").select("author","title","subreddit","subreddit_id","id").withColumn("subtime",lit(subtime)))
                result = spark.read.load(data_path,format="json").filter("author!='[deleted]'").filter("title!='[deleted]'").select("author","title","subreddit","subreddit_id","id").withColumn("subtime",lit(subtime))
                result = result.filter(F.col("title").rlike(substrings))
            else:
                print("handling"+str(year) + "-" + month_literal )
                tmp = spark.read.load(data_path,format="json").filter("author!='[deleted]'").filter("title!='[deleted]'").select("author","title","subreddit","subreddit_id","id").withColumn("subtime",lit(subtime))
                tmp = tmp.filter(F.col("title").rlike(substrings))
                result = result.union(tmp)
    result.write.mode("overwrite").format("parquet").save("/mnt/lsde/group05/"+ comments_or_submissions+"/"+path +".parquet")

    return result

In [0]:
sub_2014= read_dataset([2014],range(2,13),"submissions")

In [0]:
sub_2015= read_dataset([2015],range(1,13),"submissions")

In [0]:
sub_2016= read_dataset([2016],range(1,13),"submissions")

In [0]:
sub_2017= read_dataset([2017],range(1,13),"submissions")

In [0]:
sub_2018= read_dataset([2018],range(1,13),"submissions")

In [0]:
sub_2019= read_dataset([2019],range(1,13),"submissions")

In [0]:
sub_2019_02= read_dataset_withpath([2019],range(2,3),"submissions","2019-02")

In [0]:
sub_2020= read_dataset([2020],range(1,13),"submissions")

In [0]:
 sub_2021=  read_dataset([2021],range(1,13),"submissions")

In [0]:
sub_2022 = read_dataset([2022],range(1,9),"submissions")