# Using pyspark with the fanfiction data

### Using pyspark to manage and wrangle the data in order to learn pyspark

### Creating functions to calculate KPIs, all we should have to do when adding a KPI is to write some name and column value

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import findspark
import pyspark
import pandas as pd
from pyspark.sql import functions as F, types as T
from functools import reduce
from pyspark.sql import DataFrame


# https://github.com/kevinschaich/pyspark-cheatsheet
# https://github.com/edyoda/pyspark-tutorial/blob/master/PySpark-DataFrames.ipynb
# https://towardsdatascience.com/7-must-know-pyspark-functions-d514ca9376b9

findspark.init()
findspark.find()




'C:\\spark-3.2.1-bin-hadoop3.2'

In [2]:

     
spark = SparkSession. \
        builder. \
        master('local'). \
        appName('GitHub Activity - Getting Started'). \
        getOrCreate()
     
spark.sql('SELECT current_date').show()

+--------------+
|current_date()|
+--------------+
|    2022-07-19|
+--------------+



In [3]:
# need to use multiline = 'True' and  escape = '"' in order to read the csv containing the fanfictions
df_fan = spark.read.options(header='True', inferSchema='True', delimiter=  ",", multiline = 'True', escape = '"').csv("fanfiction_data_2022-06-05.csv")
df_fan.show(4)

+--------+--------------------+--------------------+--------------------+--------+------------------------------------+--------------------+--------------------+--------+----------+-----+--------+--------+-----+---------+----+--------------------+-------------+
|      id|                link|                text|              rating|category|                              fandom|        relationship|           character|language| published|words|chapters|comments|kudos|bookmarks|hits|       title heading|       author|
+--------+--------------------+--------------------+--------------------+--------+------------------------------------+--------------------+--------------------+--------+----------+-----+--------+--------+-----+---------+----+--------------------+-------------+
|39447360|https://archiveof...|\n\nChapter Text\...|Teen And Up Audie...|     M/M|                Heartstopper (Web...|Nicholas "Nick" N...|Nicholas "Nick" N...| English|2022-06-05|  940|    2/20|       2|    7|   

In [4]:
# take a look at the data
df_fan.printSchema()

root
 |-- id: integer (nullable = true)
 |-- link: string (nullable = true)
 |-- text: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- category: string (nullable = true)
 |-- fandom: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- character: string (nullable = true)
 |-- language: string (nullable = true)
 |-- published: string (nullable = true)
 |-- words: integer (nullable = true)
 |-- chapters: string (nullable = true)
 |-- comments: integer (nullable = true)
 |-- kudos: integer (nullable = true)
 |-- bookmarks: integer (nullable = true)
 |-- hits: integer (nullable = true)
 |-- title heading: string (nullable = true)
 |-- author: string (nullable = true)



## Create functions to calculate kPIs
The KPIs in airflow are calculated using SQL files. I want to test a different thing and that is to use pyspark instead. The idea is that I will use pyskark functions in AWS Lambda that will calculate and insert the KPI for the given day after new Fanfiction data have been inserted in the main table. 

This approach will mimic airflow but will only use Lambda and pyspark instead. 

In [36]:
"""
Many of the KPIs are simple counting, for example; counting the number of each rating or category
I have therefore created a function that takes the variable that should be counted as input.
The dictionary "metric_dic_count" determines which variables will be included and the name of the metric 
"""


# here you specify the variable and name, format is: {variable : what_name_in_table_should_be}
# The data in the database have columns such as language, rating and category
metric_dic_count = { 'language': 'language_count',
  'rating': 'rating_count',
  'category': 'category_count'}

def metric_count(variable, metric_value, df):
  """sumary_line
   variable: the column to group by 
  """
  
  df_rating= df.groupBy(variable).agg(F.count('*').alias('value'))\
  .withColumn('timestamp', F.current_timestamp())\
  .withColumn('date', F.current_date()).withColumn('metric', F.lit(metric_value))\
  .withColumnRenamed(variable, 'variable') # need to rename the column to variable

  return df_rating.select('metric', 'variable', 'timestamp', 'date', 'value') #select the order the dataframe should be 




def get_metric_count(metric_dic_count):
  """sumary_line
  Takes the function above and apply it to every metric in the dictionary 
  Returns: A concated dataframe 
  """
  
  metric_count_list = []
  for key, value in metric_dic_count.items():
    metric_count_list.append(metric_count(key, value, df_fan))
  # concat the dataframe
  df_concated =  reduce(DataFrame.unionAll, metric_count_list)
  return df_concated

# The list now contains dataframes calculated on the metrics based on count
df_metric = get_metric_count(metric_dic_count)
df_metric.show()

+--------------+--------------------+--------------------+----------+-----+
|        metric|            variable|           timestamp|      date|value|
+--------------+--------------------+--------------------+----------+-----+
|language_count|             English|2022-07-19 22:49:...|2022-07-19|   18|
|language_count|    中文-普通话 國語|2022-07-19 22:49:...|2022-07-19|    2|
|  rating_count|           Not Rated|2022-07-19 22:49:...|2022-07-19|    1|
|  rating_count|Teen And Up Audie...|2022-07-19 22:49:...|2022-07-19|    6|
|  rating_count|              Mature|2022-07-19 22:49:...|2022-07-19|    4|
|  rating_count|   General Audiences|2022-07-19 22:49:...|2022-07-19|    8|
|  rating_count|            Explicit|2022-07-19 22:49:...|2022-07-19|    1|
|category_count|                null|2022-07-19 22:49:...|2022-07-19|    2|
|category_count|                 F/F|2022-07-19 22:49:...|2022-07-19|    1|
|category_count|               Other|2022-07-19 22:49:...|2022-07-19|    1|
|category_count|   

In [37]:
# save to csv 
#outdata.write.mode('append').parquet(outloc)

#df_fan.repartition(1).write.mode('append').csv("cc_out.csv", sep=',')  #append to the folder

# Convert it into a pandas dataframe to save it, not good when large data
df_metric.toPandas()

Unnamed: 0,metric,variable,timestamp,date,value
0,language_count,English,2022-07-19 22:57:17.599,2022-07-19,18
1,language_count,中文-普通话 國語,2022-07-19 22:57:17.599,2022-07-19,2
2,rating_count,Not Rated,2022-07-19 22:57:17.599,2022-07-19,1
3,rating_count,Teen And Up Audiences,2022-07-19 22:57:17.599,2022-07-19,6
4,rating_count,Mature,2022-07-19 22:57:17.599,2022-07-19,4
5,rating_count,General Audiences,2022-07-19 22:57:17.599,2022-07-19,8
6,rating_count,Explicit,2022-07-19 22:57:17.599,2022-07-19,1
7,category_count,,2022-07-19 22:57:17.599,2022-07-19,2
8,category_count,F/F,2022-07-19 22:57:17.599,2022-07-19,1
9,category_count,Other,2022-07-19 22:57:17.599,2022-07-19,1


In [9]:
"""
The first function was only counting, this function will be able to do every agg function 
such as average, median and so on.

Write in the dictionary "metric_dic_agg" to add a KPI, the format is the following: 
{columnt to agg: [[columnvalue to agg, the wanted name for the aggregated column, agg function]]}
"""

# Add KPIs here
# Example: The column rating have three KPIs calulcated, sum of hits, sum of kudos and average words
metric_dic_agg = {'rating': [['hits','hits_rating_sum', 'sum'], ['kudos', 'kudos_rating_sum', 'sum'], ['words', 'word_rating_average', 'avg']],
                  'date': [['words', 'word_day_average', 'avg']]}


# There are different methods to calculate agg with Spark, here I use F.sum
df_fan.groupBy('rating').agg(F.sum('hits'))\
  .withColumn('timestamp', F.current_timestamp())\
  .withColumn('date', F.current_date()).withColumn('metric', F.lit('metric_value'))\
  .withColumnRenamed('rating', 'variable').show() # need to rename the column to variable

# Here I use a dic in the agg function instead
# This example is more easy to convert into a function so I will use this format
df_fan.groupBy('rating').agg({'hits':'sum'})\
  .withColumn('timestamp', F.current_timestamp())\
  .withColumn('date', F.current_date()).withColumn('metric', F.lit('metric_value'))\
  .withColumnRenamed('rating', 'variable').show() # need to rename the column to variable

+--------------------+---------+--------------------+----------+------------+
|            variable|sum(hits)|           timestamp|      date|      metric|
+--------------------+---------+--------------------+----------+------------+
|           Not Rated|        4|2022-07-17 23:04:...|2022-07-17|metric_value|
|Teen And Up Audie...|      505|2022-07-17 23:04:...|2022-07-17|metric_value|
|              Mature|      123|2022-07-17 23:04:...|2022-07-17|metric_value|
|   General Audiences|       77|2022-07-17 23:04:...|2022-07-17|metric_value|
|            Explicit|       11|2022-07-17 23:04:...|2022-07-17|metric_value|
+--------------------+---------+--------------------+----------+------------+

+--------------------+---------+--------------------+----------+------------+
|            variable|sum(hits)|           timestamp|      date|      metric|
+--------------------+---------+--------------------+----------+------------+
|           Not Rated|        4|2022-07-17 23:04:...|2022-07-17

In [38]:
metric_dic_agg = {'rating': [['hits','hits_rating_sum', 'sum'], ['kudos', 'kudos_rating_sum', 'sum'], ['words', 'word_rating_average', 'avg']],
                  'published': [['words', 'word_day_average', 'avg']], 'language': [['words', 'word_language_avg', 'avg']]}


def metric_agg(key, agg_col, metric_name, agg_fun):
      """""
      Keyword arguments:
      argument: key: the column to agg value of, agg_col: value, metric_name: name of new column, agg_fun: the function you want to use
      Return: a dataframe with agg value for a column
      """
      df_metric = df_fan.groupBy(key).agg({agg_col:agg_fun})\
      .withColumn('timestamp', F.current_timestamp())\
      .withColumn('date', F.current_date()) \
      .withColumn('metric', F.lit(metric_name))\
      .withColumnRenamed(key, 'variable') \
      .withColumnRenamed(f'{agg_fun}({agg_col})', 'value') # need to rename the column to variable

      return df_metric.select('metric', 'variable', 'timestamp', 'date', 'value') #select the order the dataframe should be 


def get_metric_agg(metric_dic_agg):
  """sumary_line
  argument: Takes the dic with KPIs 
  Return: A concated dataframw
  """
  df_list = []
  for key, items in metric_dic_agg.items():
    for list_values in items:
      agg_col = list_values[0]
      metric_name = list_values[1]
      agg_fun = list_values[2]
      df_list.append(metric_agg(key, agg_col, metric_name, agg_fun))
  # concat the dataframe
  df_concated =  reduce(DataFrame.unionAll, df_list)
  return df_concated

df_agg = get_metric_agg(metric_dic_agg)
df_agg.show()



+-------------------+--------------------+--------------------+----------+------------------+
|             metric|            variable|           timestamp|      date|             value|
+-------------------+--------------------+--------------------+----------+------------------+
|    hits_rating_sum|           Not Rated|2022-07-19 22:58:...|2022-07-19|               4.0|
|    hits_rating_sum|Teen And Up Audie...|2022-07-19 22:58:...|2022-07-19|             505.0|
|    hits_rating_sum|              Mature|2022-07-19 22:58:...|2022-07-19|             123.0|
|    hits_rating_sum|   General Audiences|2022-07-19 22:58:...|2022-07-19|              77.0|
|    hits_rating_sum|            Explicit|2022-07-19 22:58:...|2022-07-19|              11.0|
|   kudos_rating_sum|           Not Rated|2022-07-19 22:58:...|2022-07-19|               0.0|
|   kudos_rating_sum|Teen And Up Audie...|2022-07-19 22:58:...|2022-07-19|              91.0|
|   kudos_rating_sum|              Mature|2022-07-19 22:58:.