In [16]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("pyspark-basics") \
    .config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
    .getOrCreate()

df = spark.read.csv("gs://pyspark-fs-sid/stackoverflowposts.csv",header=True)

df_subset = df.select(["tags","view_count","creation_date"])

In [2]:
from pyspark.sql import functions as F

df_tagged = df_subset \
        .withColumn(
        'post_type',
        F.when(F.col('tags').like('python%'),'python') \
        .otherwise("others")
)

In [3]:
from pyspark.sql.types import TimestampType,IntegerType
from pyspark.sql.functions import year,month

df_tagged = df_tagged \
    .withColumn("creation_date",df_tagged["creation_date"].cast(TimestampType())) \
    .withColumn("view_count",df_tagged["view_count"].cast(IntegerType()))

df_tagged = df_tagged \
    .withColumn("creation_year",year(df_tagged.creation_date)) \
    .withColumn("creation_month",month(df_tagged.creation_date))

In [None]:
# df_tagged = df_tagged \
#     .filter(
#         ( df_tagged.creation_year==2015 )
#         & 
#         ( df_tagged.post_type=='python')
#     )

In [4]:
df_tagged = df_tagged.select(["post_type","creation_year","creation_month","view_count"])

In [5]:
df_tagged = df_tagged.groupBy(["post_type","creation_year","creation_month"]) \
            .agg(
                F.sum("view_count").alias("monthly_views")
            )

In [6]:
# df_tagged.show()

[Stage 1:>                                                          (0 + 1) / 1]

+---------+-------------+--------------+-------------+
|post_type|creation_year|creation_month|monthly_views|
+---------+-------------+--------------+-------------+
|   others|         2016|             2|         2239|
|   others|         2013|            12|        25193|
|   others|         2010|            11|        33631|
|   others|         2014|             8|        17015|
|   python|         2012|             1|          876|
|   others|         2009|             3|        13244|
|   python|         2012|             9|         1546|
|   others|         2013|             7|        73608|
|   others|         2013|             8|        31800|
|   others|         2013|             9|        18786|
|   python|         2015|             9|          445|
|   others|         2015|            11|         4483|
|   others|         2013|             6|        38643|
|   python|         2015|            11|          666|
|   others|         2009|            10|         6510|
|   others

                                                                                

In [7]:
from pyspark.sql.window import Window

In [8]:

running_sum_window_spec = Window.partitionBy(["post_type","creation_year"]) \
                            .rowsBetween(Window.unboundedPreceding,Window.currentRow) \
                            .orderBy("creation_month")

In [9]:

df_final_result = df_tagged.withColumn("running_total_views",F.sum("monthly_views") \
                     .over(running_sum_window_spec)
                    ).orderBy("creation_month")

In [1]:
# df_final_result.show()

In [18]:
df_final_result.write.format('bigquery') \
        .option('table','data_mart.rolling_view_count_by_tags') \
        .option('temporaryGcsBucket','pyspark-fs-sid/tmp_staging') \
        .save()

                                                                                

In [19]:
df_final_result.write.mode("append").parquet("gs://spark-result-bkt/spark_output/")

                                                                                

In [20]:

df_final_result.write.mode("overwrite").json("gs://spark-result-bkt/spark_output/")

                                                                                

In [None]:

running_avg_window_spec = Window.partitionBy(["post_type","creation_year"]) \
                            .rowsBetween(-2,Window.currentRow) \
                            .orderBy("creation_month")


In [None]:

df_tagged.withColumn("running_3monthly_avg_views",F.avg("monthly_views") \
                     .over(running_avg_window_spec)
                    ).orderBy("creation_month").show(50,False)
