In [None]:
from pyspark.sql import SparkSession
import os

spark = SparkSession \
        .builder \
        .appName('Access GCS') \
        .getOrCreate()

spark._jsc.hadoopConfiguration() \
    .set("google.cloud.auth.service.account.json.keyfile","/.google/credentials/google_credentials_project.json")


BUCKET = os.getenv('GCP_GCS_BUCKET')
PROJECT = os.getenv('GCP_PROJECT_ID')

In [None]:
def load_to_bigquery(table, date):
    path = f"gs://{BUCKET}/BigQuery/{table}-{date}/*.parquet"
    df = spark.read.parquet(path, header = True)
    df.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()

In [None]:
from google.cloud import storage
import pandas as pd
import os
import subprocess

# Initialise a client
storage_client = storage.Client("DE-stack-overflow")
# Create a bucket object for our bucket
bucket = storage_client.get_bucket('dtc_data_lake_de-stack-overflow')
# all files in the bucket 
files = list(bucket.list_blobs())
files = [blob.name for blob in files if 'BigQuery/' in blob.name]
files = [file for file in files if '.parquet' in file]

In [None]:
tables = ['badges', 'posts_questions', 'posts_answers', 'users']
total = 0
for table in tables:
    uris = [f'gs://{BUCKET}/'+ file for file in files if table in file]
    total += len(uris)/4
    print(table, len(uris)/4)
total

In [None]:
path = rf"gs://{BUCKET}/BigQuery/posts_questions-*/*.parquet"
df = spark.read.parquet(path, header = True)

df.createOrReplaceTempView('posts_questions')

In [None]:
# What is the percentage of questions that have been answered over the years?
spark.sql('''
SELECT
  EXTRACT(YEAR FROM creation_date) AS Year,
  COUNT(*) AS Number_of_Questions,
  ROUND(100 * SUM(IF(answer_count > 0, 1, 0)) / COUNT(*), 1) AS Percent_Questions_with_Answers
FROM
  posts_questions
GROUP BY
  Year
ORDER BY
  Year
    ''').show()

In [None]:
path_users=rf"gs://{BUCKET}/BigQuery/users-*/*.parquet"
df_users = spark.read.parquet(path_users, header = True)
path_badges=rf"gs://{BUCKET}/BigQuery/badges-*/*.parquet"
df_badges = spark.read.parquet(path_badges, header = True)
path_answers=rf"gs://{BUCKET}/BigQuery/posts_answers-*/*.parquet"
df_answers = spark.read.parquet(path_answers, header = True)

In [None]:
df_users.createOrReplaceTempView('users')
df_badges.createOrReplaceTempView('badges')
df_answers.createOrReplaceTempView('posts_answers')

In [None]:
# What is the reputation and badge count of users across different tenures on StackOverflow?
Q4 = spark.sql('''
SELECT user_Tenure,
       COUNT(1) AS Num_Users,
       ROUND(AVG(reputation)) AS Avg_Reputation,
       ROUND(AVG(num_badges)) AS Avg_Num_Badges
FROM (
  SELECT users.id AS user,
          ROUND(EXTRACT(DAY FROM CURRENT_TIMESTAMP()-MIN(users.creation_date))/365) AS user_tenure,
         MIN(users.reputation) AS reputation,
         SUM(IF(badges.user_id IS NULL, 0, 1)) AS num_badges
  FROM users
  LEFT JOIN badges
  ON users.id = badges.user_id
  GROUP BY user
)
GROUP BY User_Tenure
ORDER BY User_Tenure
    ''')