In [1]:
from pyspark.sql import SparkSession
import os

spark = SparkSession \
        .builder \
        .appName('Access GCS') \
        .getOrCreate()

spark._jsc.hadoopConfiguration() \
    .set("google.cloud.auth.service.account.json.keyfile","/.google/credentials/google_credentials_project.json")


BUCKET = os.getenv('GCP_GCS_BUCKET')
PROJECT = os.getenv('GCP_PROJECT_ID')

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/26 02:16:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Testing on GCS bucket

In [None]:
def load_to_bigquery(table, date):
    path=f"gs://{BUCKET}/BigQuery/{table}-{date}/*.parquet"
    df = spark.read.parquet(path, header = True)
    df.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()

In [13]:
from google.cloud import storage
import pandas as pd
import os
import subprocess

# Initialise a client
storage_client = storage.Client("DE-stack-overflow")
# Create a bucket object for our bucket
bucket = storage_client.get_bucket('dtc_data_lake_de-stack-overflow')
# all files in the bucket 
files = list(bucket.list_blobs())
files = [blob.name for blob in files if 'BigQuery/' in blob.name]
files = [file for file in files if '.parquet' in file]

In [39]:
tables = ['badges', 'posts_questions', 'posts_answers', 'users']

In [45]:
total = 0
for table in tables:
    uris = [f'gs://{BUCKET}/'+ file for file in files if table in file]
    total += len(uris)/4
    print(table, len(uris)/4)
total

badges 164.0
posts_questions 165.0
posts_answers 164.5
users 165.0


658.5

In [50]:
path=rf"gs://{BUCKET}/BigQuery/posts_questions-*/*.parquet"
df = spark.read.parquet(path, header = True)

                                                                                

In [52]:
df.createOrReplaceTempView('posts_questions')

In [56]:
# What is the percentage of questions that have been answered over the years?
spark.sql('''
SELECT
  EXTRACT(YEAR FROM creation_date) AS Year,
  COUNT(*) AS Number_of_Questions,
  ROUND(100 * SUM(IF(answer_count > 0, 1, 0)) / COUNT(*), 1) AS Percent_Questions_with_Answers
FROM
  posts_questions
GROUP BY
  Year
ORDER BY
  Year
    ''').show()



+----+-------------------+------------------------------+
|Year|Number_of_Questions|Percent_Questions_with_Answers|
+----+-------------------+------------------------------+
|2008|              57755|                          99.9|
|2009|             342048|                          99.6|
|2010|             463005|                          99.1|
|2011|             573539|                          97.3|
|2012|            1441630|                          94.9|
|2013|            1409097|                          92.4|
|2014|            1566938|                          88.9|
|2015|            1982395|                          87.0|
|2016|            2056943|                          85.3|
|2017|            2118252|                          83.8|
|2018|            1891231|                          82.6|
|2019|            1769797|                          82.2|
|2020|            1605271|                          81.0|
|2021|            1557939|                          66.3|
|2022|        

                                                                                

In [3]:
path_users=rf"gs://{BUCKET}/BigQuery/users-*/*.parquet"
df_users = spark.read.parquet(path_users, header = True)
path_badges=rf"gs://{BUCKET}/BigQuery/badges-*/*.parquet"
df_badges = spark.read.parquet(path_badges, header = True)
path_answers=rf"gs://{BUCKET}/BigQuery/posts_answers-*/*.parquet"
df_answers = spark.read.parquet(path_answers, header = True)

                                                                                

In [4]:
df_users.createOrReplaceTempView('users')
df_badges.createOrReplaceTempView('badges')
df_answers.createOrReplaceTempView('posts_answers')

In [5]:
# What is the reputation and badge count of users across different tenures on StackOverflow?
Q4 = spark.sql('''
SELECT user_Tenure,
       COUNT(1) AS Num_Users,
       ROUND(AVG(reputation)) AS Avg_Reputation,
       ROUND(AVG(num_badges)) AS Avg_Num_Badges
FROM (
  SELECT users.id AS user,
          ROUND(EXTRACT(DAY FROM CURRENT_TIMESTAMP()-MIN(users.creation_date))/365) AS user_tenure,
         MIN(users.reputation) AS reputation,
         SUM(IF(badges.user_id IS NULL, 0, 1)) AS num_badges
  FROM users
  LEFT JOIN badges
  ON users.id = badges.user_id
  GROUP BY user
)
GROUP BY User_Tenure
ORDER BY User_Tenure
    ''')

In [6]:
spark.conf.set("temporaryGcsBucket",BUCKET)
Q4.write \
  .format("bigquery") \
  .option("writeMethod", "direct") \
  .save("de-stack-overflow.stack_overflow_data.Q4")

22/04/26 02:23:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:36 WARN RowBasedKeyValueBatch: Calling spill() on

22/04/26 02:23:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:41 WARN RowBasedKeyValueBatch: Calling spill() on

22/04/26 02:23:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:23:46 WARN RowBasedKeyValueBatch: Calling spill() on

22/04/26 02:23:54 WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.
22/04/26 02:23:55 WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.
22/04/26 02:23:55 WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.
22/04/26 02:23:55 WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.
22/04/26 02:23:55 WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.
22/04/26 02:23:55 WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.
22/04/26 02:23:56 WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.
22/04/26 02:23:56 WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.
22/04/26 02:23:56 WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.
22/04/26 02:23:56 WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.
22/04/26 02:23:56 WARN TaskMemoryManager

Py4JError: An error occurred while calling o55.save

In [82]:
# What are 10 of the “easier” gold badges to earn?
spark.sql('''
SELECT badge_name AS First_Gold_Badge, 
       COUNT(1) AS Num_Users,
       ROUND(AVG(tenure_in_days)) AS Avg_Num_Days
FROM
(
  SELECT 
    badges.user_id AS user_id,
    badges.name AS badge_name,
    EXTRACT(DAY FROM badges.date - users.creation_date) AS tenure_in_days,
    ROW_NUMBER() OVER (PARTITION BY badges.user_id
                       ORDER BY badges.date) AS row_number
  FROM 
    badges
  JOIN
    users
  ON badges.user_id = users.id
  WHERE badges.class = 1 
) 
WHERE row_number = 1
GROUP BY First_Gold_Badge
ORDER BY Num_Users DESC
LIMIT 10
    ''').show()



+----------------+---------+------------+
|First_Gold_Badge|Num_Users|Avg_Num_Days|
+----------------+---------+------------+
| Famous Question|   392661|      1587.0|
|         Fanatic|    26818|       848.0|
|    Great Answer|    23796|      1921.0|
|     Unsung Hero|    20557|       811.0|
|      Electorate|    11631|      1201.0|
|        Populist|    10509|      1680.0|
|       Publicist|     2515|      2386.0|
|         Steward|     1547|      1199.0|
|  Great Question|      710|       875.0|
|     Copy Editor|      342|       743.0|
+----------------+---------+------------+



                                                                                

## Uploading views to BigQuery

In [2]:
tables = ['users', 'badges', 'posts_answers', 'posts_questions']

for table in tables:
    table_name = 'df_' + str(table)
    table_name = spark.read \
      .format("bigquery")\
      .load(f"de-stack-overflow.stack_overflow_data.{table}_stack_overflow_data_partitioned") 
    
    table_name.createOrReplaceTempView(table) 

In [27]:
# What is the percentage of questions that have been answered over the years?
df_posts_questions = spark.sql('''
SELECT
  EXTRACT(YEAR FROM creation_date) AS Year,
  COUNT(*) AS Number_of_Questions,
  ROUND(100 * SUM(IF(answer_count > 0, 1, 0)) / COUNT(*), 1) AS Percent_Questions_with_Answers
FROM
  posts_questions
GROUP BY
  Year
ORDER BY
  Year
    ''')

In [33]:
spark.conf.set("temporaryGcsBucket",BUCKET)
df_posts_questions.write \
  .format("bigquery") \
  .option("writeMethod", "direct") \
  .save("de-stack-overflow.stack_overflow_data.Q1")

                                                                                

In [34]:
# What are 10 of the “easier” gold badges to earn?
Q2 = spark.sql('''
SELECT badge_name AS First_Gold_Badge, 
       COUNT(1) AS Num_Users,
       ROUND(AVG(tenure_in_days)) AS Avg_Num_Days
FROM
(
  SELECT 
    badges.user_id AS user_id,
    badges.name AS badge_name,
    EXTRACT(DAY FROM badges.date - users.creation_date) AS tenure_in_days,
    ROW_NUMBER() OVER (PARTITION BY badges.user_id
                       ORDER BY badges.date) AS row_number
  FROM 
    badges
  JOIN
    users
  ON badges.user_id = users.id
  WHERE badges.class = 1 
) 
WHERE row_number = 1
GROUP BY First_Gold_Badge
ORDER BY Num_Users DESC
LIMIT 10
    ''')

In [35]:
spark.conf.set("temporaryGcsBucket",BUCKET)
Q2.write \
  .format("bigquery") \
  .option("writeMethod", "direct") \
  .save("de-stack-overflow.stack_overflow_data.Q2")

                                                                                

In [36]:
# Which day of the week has most questions answered within an hour?
Q3 = spark.sql('''
SELECT
  question_day,
  COUNT(answer_id) as count_answers,
  COUNT(question_id) as count_questions,
  ROUND(COUNT(answer_id)/COUNT(question_id)*100,2) as percent_questions
FROM
  (SELECT
    EXTRACT(DAYOFWEEK from posts_questions.creation_date) as question_day,
    posts_questions.accepted_answer_id as answer_id,
    posts_questions.id as question_id
   FROM 
    posts_questions
   FULL JOIN
    posts_answers ON posts_questions.id = posts_answers.parent_id
   WHERE
     posts_answers.creation_date < posts_questions.creation_date + INTERVAL '1 hour')
GROUP BY
  question_day
ORDER BY 
  question_day
    ''')

In [37]:
spark.conf.set("temporaryGcsBucket",BUCKET)
Q3.write \
  .format("bigquery") \
  .option("writeMethod", "direct") \
  .save("de-stack-overflow.stack_overflow_data.Q3")

22/04/26 01:48:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 01:48:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 01:48:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 01:48:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 01:48:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 01:48:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 01:48:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 01:48:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 01:48:06 WARN RowBasedKeyValueBatch: Calling spill() on



22/04/26 01:48:17 WARN TaskMemoryManager: Failed to allocate a page (8388608 bytes), try again.
22/04/26 01:48:18 WARN TaskMemoryManager: Failed to allocate a page (8388608 bytes), try again.
22/04/26 01:48:29 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 01:48:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 01:48:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 01:48:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 01:48:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 01:48:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 01:48:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but

In [3]:
# What is the reputation and badge count of users across different tenures on StackOverflow?
Q4 = spark.sql('''
SELECT user_Tenure,
       COUNT(1) AS Num_Users,
       ROUND(AVG(reputation)) AS Avg_Reputation,
       ROUND(AVG(num_badges)) AS Avg_Num_Badges
FROM (
  SELECT users.id AS user,
          ROUND(EXTRACT(DAY FROM CURRENT_TIMESTAMP()-MIN(users.creation_date))/365) AS user_tenure,
         MIN(users.reputation) AS reputation,
         SUM(IF(badges.user_id IS NULL, 0, 1)) AS num_badges
  FROM users
  LEFT JOIN badges
  ON users.id = badges.user_id
  GROUP BY user
)
GROUP BY User_Tenure
ORDER BY User_Tenure
    ''')

In [4]:
spark.conf.set("temporaryGcsBucket",BUCKET)
Q4.write \
  .format("bigquery") \
  .option("writeMethod", "direct") \
  .save("de-stack-overflow.stack_overflow_data.Q4")

22/04/26 02:11:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:11:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:11:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:11:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:11:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:11:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:11:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:11:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/26 02:11:41 WARN RowBasedKeyValueBatch: Calling spill() on



22/04/26 02:11:45 ERROR Executor: Exception in task 4.0 in stage 4.0 (TID 16)
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)
	at org.apache.spark.io.ReadAheadInputStream.<init>(ReadAheadInputStream.java:105)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:74)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:523)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:172)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)


22/04/26 02:11:46 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 12) (d6a9d30de4a5 executor driver): TaskKilled (Stage cancelled)
22/04/26 02:11:46 WARN TaskSetManager: Lost task 5.0 in stage 4.0 (TID 17) (d6a9d30de4a5 executor driver): TaskKilled (Stage cancelled)
22/04/26 02:11:46 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 13) (d6a9d30de4a5 executor driver): TaskKilled (Stage cancelled)
22/04/26 02:11:46 WARN TaskSetManager: Lost task 2.0 in stage 4.0 (TID 14) (d6a9d30de4a5 executor driver): TaskKilled (Stage cancelled)
22/04/26 02:11:46 WARN TaskSetManager: Lost task 6.0 in stage 4.0 (TID 18) (d6a9d30de4a5 executor driver): TaskKilled (Stage cancelled)
22/04/26 02:11:46 ERROR FileFormatWriter: Aborting job 832e7e30-bad2-470e-b00a-df2a3d5f2423.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 1 times, most recent failure: Lost task 4.0 in stage 4.0 (TID 16) (d6a9d30de4a5 executor driver): java.lang.OutOfMemoryError: Java he

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/py4j/clientserver.py", line 480, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/airflow/.local/lib/python3.7/site-packages/py4j/clientserver.py", line 504, in send_command
    "Error while sending or receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while sending or receiving


Py4JError: An error occurred while calling o43.save

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/py4j/clientserver.py", line 480, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/airflow/.local/lib/python3.7/site-packages/py4j/clientserver.py", line 504, in send_command
    "Error while sending or receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while sending or receiving
