In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, month, broadcast

import os

In [2]:
spark = SparkSession.builder.appName('Optimize I').getOrCreate()

In [3]:
base_path = os.getcwd()

# project_path = ('/').join(base_path.split('/')[0:-3])
project_path = '/Users/mallory/Desktop/DataEngineering/Springboard/DistributedComputing/SparkOptim'

answers_input_path = os.path.join(project_path, 'data/answers')
print('Answers Input Path: ', answers_input_path)

questions_input_path = os.path.join(project_path, 'data/questions')
print('Questions Input Path: ', questions_input_path)



Answers Input Path:  /Users/mallory/Desktop/DataEngineering/Springboard/DistributedComputing/SparkOptim/data/answers
Questions Input Path:  /Users/mallory/Desktop/DataEngineering/Springboard/DistributedComputing/SparkOptim/data/questions


In [14]:
answersDF = spark.read.option('path', answers_input_path).load()

questionsDF = spark.read.option('path', questions_input_path).load()


In [11]:
answersDF.show(truncate=False)

+-----------+---------+-----------------------+--------+-------+-----+
|question_id|answer_id|creation_date          |comments|user_id|score|
+-----------+---------+-----------------------+--------+-------+-----+
|226592     |226595   |2015-12-29 17:46:42.963|3       |82798  |2    |
|388057     |388062   |2018-02-22 12:52:16.39 |8       |520    |21   |
|293286     |293305   |2016-11-17 15:35:58.763|0       |47472  |2    |
|442499     |442503   |2018-11-22 00:34:33.433|0       |137289 |0    |
|293009     |293031   |2016-11-16 07:36:35.937|0       |83721  |0    |
|395532     |395537   |2018-03-25 00:51:18.84 |0       |1325   |0    |
|329826     |329843   |2017-04-29 10:42:49.757|4       |520    |1    |
|294710     |295061   |2016-11-26 19:29:17.543|2       |114696 |2    |
|291910     |291917   |2016-11-10 04:56:39.403|0       |114696 |2    |
|372382     |372394   |2017-12-03 20:17:41.083|0       |172328 |0    |
|178387     |178394   |2015-04-25 12:31:28.92 |6       |62726  |0    |
|39394

In [13]:
questionsDF.show(truncate=True)

+-----------+--------------------+--------------------+--------------------+------------------+--------+-------+-----+
|question_id|                tags|       creation_date|               title|accepted_answer_id|comments|user_id|views|
+-----------+--------------------+--------------------+--------------------+------------------+--------+-------+-----+
|     382738|[optics, waves, f...|2018-01-28 01:22:...|What is the pseud...|            382772|       0|  76347|   32|
|     370717|[field-theory, de...|2017-11-25 03:09:...|What is the defin...|              null|       1|  75085|   82|
|     339944|[general-relativi...|2017-06-17 15:32:...|Could gravitation...|              null|      13| 116137|  333|
|     233852|[homework-and-exe...|2016-02-04 15:19:...|When does travell...|              null|       9|  95831|  185|
|     294165|[quantum-mechanic...|2016-11-22 05:39:...|Time-dependent qu...|              null|       1| 118807|   56|
|     173819|[homework-and-exe...|2015-04-02 10:

In [15]:
answers_by_month = answersDF.withColumn('month', month('creation_date')).groupBy(
    'question_id', 'month').agg(count('*').alias('cnt'))
# cache
answers_by_month.cache()
# print(answers_month.is_cached())


DataFrame[question_id: bigint, month: int, cnt: bigint]

In [10]:
answers_by_month.show()

+-----------+-----+---+
|question_id|month|cnt|
+-----------+-----+---+
|     358894|    9|  5|
|     332782|    5|  2|
|     281552|    9|  2|
|     332224|    5|  1|
|     395851|    3|  3|
|     192346|    7|  1|
|     302487|    1|  3|
|     317571|    3|  2|
|     179458|    5|  2|
|     294966|   11|  5|
|     199602|    8|  6|
|     251275|    4|  2|
|     208722|    9|  1|
|     284125|   10|  1|
|     427452|    9|  4|
|     399738|    4|  1|
|     217997|   11|  4|
|     386225|    2|  1|
|     305095|    1|  3|
|     206822|    9|  6|
+-----------+-----+---+
only showing top 20 rows



In [18]:
answers_by_month.explain()

== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [question_id#347L, month#375, cnt#391L]
      +- InMemoryRelation [question_id#347L, month#375, cnt#391L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(2) HashAggregate(keys=[question_id#347L, month#375], functions=[count(1)])
               +- Exchange hashpartitioning(question_id#347L, month#375, 200), ENSURE_REQUIREMENTS, [id=#206]
                  +- *(1) HashAggregate(keys=[question_id#347L, month#375], functions=[partial_count(1)])
                     +- *(1) Project [question_id#347L, month(cast(creation_date#349 as date)) AS month#375]
                        +- *(1) ColumnarToRow
                           +- FileScan parquet [question_id#347L,creation_date#349] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/mallory/Desktop/DataEngineering/Springboard/DistributedComputing/Sp..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<question_id:

In [16]:
resultDF = answers_by_month.join(broadcast(questionsDF), 'question_id').select(
    'question_id', 'creation_date', 'title', 'month', 'cnt')

resultDF.orderBy('question_id', 'month').show()


+-----------+--------------------+--------------------+-----+---+
|question_id|       creation_date|               title|month|cnt|
+-----------+--------------------+--------------------+-----+---+
|     155989|2014-12-31 19:59:...|Frost bubble form...|    2|  1|
|     155989|2014-12-31 19:59:...|Frost bubble form...|   12|  1|
|     155990|2014-12-31 20:51:...|The abstract spac...|    1|  2|
|     155992|2014-12-31 21:44:...|centrifugal force...|    1|  1|
|     155993|2014-12-31 21:56:...|How can I estimat...|    1|  1|
|     155995|2014-12-31 23:16:...|Why should a solu...|    1|  3|
|     155996|2015-01-01 00:06:...|Why do we assume ...|    1|  2|
|     155996|2015-01-01 00:06:...|Why do we assume ...|    2|  1|
|     155996|2015-01-01 00:06:...|Why do we assume ...|   11|  1|
|     155997|2015-01-01 00:26:...|Why do square sha...|    1|  3|
|     155999|2015-01-01 01:01:...|Diagonalizability...|    1|  1|
|     156008|2015-01-01 02:48:...|Capturing a light...|    1|  2|
|     1560

In [17]:
resultDF.explain()

== Physical Plan ==
*(2) Project [question_id#347L, creation_date#361, title#362, month#375, cnt#391L]
+- *(2) BroadcastHashJoin [question_id#347L], [question_id#359L], Inner, BuildRight, false
   :- *(2) Filter isnotnull(question_id#347L)
   :  +- *(2) ColumnarToRow
   :     +- InMemoryTableScan [question_id#347L, month#375, cnt#391L], [isnotnull(question_id#347L)]
   :           +- InMemoryRelation [question_id#347L, month#375, cnt#391L], StorageLevel(disk, memory, deserialized, 1 replicas)
   :                 +- *(2) HashAggregate(keys=[question_id#347L, month#375], functions=[count(1)])
   :                    +- Exchange hashpartitioning(question_id#347L, month#375, 200), ENSURE_REQUIREMENTS, [id=#206]
   :                       +- *(1) HashAggregate(keys=[question_id#347L, month#375], functions=[partial_count(1)])
   :                          +- *(1) Project [question_id#347L, month(cast(creation_date#349 as date)) AS month#375]
   :                             +- *(1) Columnar

In [7]:
# remove from memory
answers_by_month.unpersist()


DataFrame[question_id: bigint, month: int, cnt: bigint]