In [14]:
%%time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, month, broadcast
import os
spark = SparkSession.builder.appName('Optimize I').getOrCreate()
base_path = os.getcwd()
print(base_path)
project_path = r'C:\Users\samy8\Desktop\Work Lab\SpringBoard\github\Spark-Optimization-Mini-Project\Optimization\Optimization'#('/').join(base_path.split('/')[0:-3]) 
print (project_path)

answers_input_path = os.path.join(project_path, 'data/answers')
questions_input_path = os.path.join(project_path, 'data/questions')

answersDF = spark.read.option('path', answers_input_path).load()
questionsDF = spark.read.option('path', questions_input_path).load()
print(answersDF.count())
print(questionsDF.count())

C:\Users\samy8
C:\Users\samy8\Desktop\Work Lab\SpringBoard\github\Spark-Optimization-Mini-Project\Optimization\Optimization
110714
86936
Wall time: 506 ms


In [15]:
%%time
#original
answers_month = answersDF.withColumn('month', month('creation_date')).groupBy('question_id', 'month').agg(count('*').alias('cnt'))
resultDF = questionsDF.join(answers_month, 'question_id').select('question_id', 'creation_date', 'title', 'month', 'cnt')

resultDF.orderBy('question_id', 'month').show()

+-----------+--------------------+--------------------+-----+---+
|question_id|       creation_date|               title|month|cnt|
+-----------+--------------------+--------------------+-----+---+
|     155989|2014-12-31 20:59:...|Frost bubble form...|    1|  1|
|     155989|2014-12-31 20:59:...|Frost bubble form...|    2|  1|
|     155990|2014-12-31 21:51:...|The abstract spac...|    1|  2|
|     155992|2014-12-31 22:44:...|centrifugal force...|    1|  1|
|     155993|2014-12-31 22:56:...|How can I estimat...|    1|  1|
|     155995|2015-01-01 00:16:...|Why should a solu...|    1|  3|
|     155996|2015-01-01 01:06:...|Why do we assume ...|    1|  2|
|     155996|2015-01-01 01:06:...|Why do we assume ...|    2|  1|
|     155996|2015-01-01 01:06:...|Why do we assume ...|   11|  1|
|     155997|2015-01-01 01:26:...|Why do square sha...|    1|  3|
|     155999|2015-01-01 02:01:...|Diagonalizability...|    1|  1|
|     156008|2015-01-01 03:48:...|Capturing a light...|    1|  2|
|     1560

In [16]:
%%time
#cache option
answers_month = answersDF.withColumn('month', month('creation_date')).groupBy('question_id', 'month').agg(count('*').alias('cnt'))

answers_month.cache()

resultDF = questionsDF.join(broadcast(answers_month), 'question_id').select('question_id', 'creation_date', 'title', 'month', 'cnt')

resultDF.orderBy('question_id', 'month').show()

+-----------+--------------------+--------------------+-----+---+
|question_id|       creation_date|               title|month|cnt|
+-----------+--------------------+--------------------+-----+---+
|     155989|2014-12-31 20:59:...|Frost bubble form...|    1|  1|
|     155989|2014-12-31 20:59:...|Frost bubble form...|    2|  1|
|     155990|2014-12-31 21:51:...|The abstract spac...|    1|  2|
|     155992|2014-12-31 22:44:...|centrifugal force...|    1|  1|
|     155993|2014-12-31 22:56:...|How can I estimat...|    1|  1|
|     155995|2015-01-01 00:16:...|Why should a solu...|    1|  3|
|     155996|2015-01-01 01:06:...|Why do we assume ...|    1|  2|
|     155996|2015-01-01 01:06:...|Why do we assume ...|    2|  1|
|     155996|2015-01-01 01:06:...|Why do we assume ...|   11|  1|
|     155997|2015-01-01 01:26:...|Why do square sha...|    1|  3|
|     155999|2015-01-01 02:01:...|Diagonalizability...|    1|  1|
|     156008|2015-01-01 03:48:...|Capturing a light...|    1|  2|
|     1560

In [17]:
%%time

#switch join order

answers_month = answersDF.withColumn('month', month('creation_date')).groupBy('question_id', 'month').agg(count('*').alias('cnt'))

resultDF = answers_month.join(questionsDF, 'question_id').select('question_id', 'creation_date', 'title', 'month', 'cnt')

resultDF.orderBy('question_id', 'month').show()

+-----------+--------------------+--------------------+-----+---+
|question_id|       creation_date|               title|month|cnt|
+-----------+--------------------+--------------------+-----+---+
|     155989|2014-12-31 20:59:...|Frost bubble form...|    1|  1|
|     155989|2014-12-31 20:59:...|Frost bubble form...|    2|  1|
|     155990|2014-12-31 21:51:...|The abstract spac...|    1|  2|
|     155992|2014-12-31 22:44:...|centrifugal force...|    1|  1|
|     155993|2014-12-31 22:56:...|How can I estimat...|    1|  1|
|     155995|2015-01-01 00:16:...|Why should a solu...|    1|  3|
|     155996|2015-01-01 01:06:...|Why do we assume ...|    1|  2|
|     155996|2015-01-01 01:06:...|Why do we assume ...|    2|  1|
|     155996|2015-01-01 01:06:...|Why do we assume ...|   11|  1|
|     155997|2015-01-01 01:26:...|Why do square sha...|    1|  3|
|     155999|2015-01-01 02:01:...|Diagonalizability...|    1|  1|
|     156008|2015-01-01 03:48:...|Capturing a light...|    1|  2|
|     1560

In [18]:
%%time
#using repartition

repAnswersDF = answersDF \
    .repartition('question_id') \
    .withColumn('month', month('creation_date')) \
    .groupBy('question_id', 'month') \
    .agg(count('*').alias('cnt'))

repResultDF = questionsDF \
    .join(repAnswersDF, 'question_id') \
    .select('question_id', 'creation_date', 'title', 'month', 'cnt') \
    .orderBy('question_id', 'month')
repResultDF.show()

+-----------+--------------------+--------------------+-----+---+
|question_id|       creation_date|               title|month|cnt|
+-----------+--------------------+--------------------+-----+---+
|     155989|2014-12-31 20:59:...|Frost bubble form...|    1|  1|
|     155989|2014-12-31 20:59:...|Frost bubble form...|    2|  1|
|     155990|2014-12-31 21:51:...|The abstract spac...|    1|  2|
|     155992|2014-12-31 22:44:...|centrifugal force...|    1|  1|
|     155993|2014-12-31 22:56:...|How can I estimat...|    1|  1|
|     155995|2015-01-01 00:16:...|Why should a solu...|    1|  3|
|     155996|2015-01-01 01:06:...|Why do we assume ...|    1|  2|
|     155996|2015-01-01 01:06:...|Why do we assume ...|    2|  1|
|     155996|2015-01-01 01:06:...|Why do we assume ...|   11|  1|
|     155997|2015-01-01 01:26:...|Why do square sha...|    1|  3|
|     155999|2015-01-01 02:01:...|Diagonalizability...|    1|  1|
|     156008|2015-01-01 03:48:...|Capturing a light...|    1|  2|
|     1560