In [1]:
import argparse
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.conf import SparkConf
from pyspark.sql.functions import spark_partition_id 
from pyspark.context import SparkContext
from google.cloud import storage
from google.cloud import bigquery as bq

In [2]:
def check_bqtable_exists(client, table_id):
    try:
        client.get_table(table_id)
        return True
    except:
        return False

In [3]:
inputYear = 2019
inputYear = str(inputYear)
keyLocation = '/Users/sonny/Git/de_zoomcamp_project_2024_SP/keys/cvd-key.json'
bucketName = 'cvd-bucket-de2024'

In [4]:
#Create BigQuery Output External Table if not already existing, includes defined schema
client = bq.Client()
table_id = "cvd-sp-de-zoomcamp-2024.cvd_dataset.bq_output_table"
tableExists = check_bqtable_exists(client, table_id)
if tableExists == False:
    schema = [
        bq.SchemaField("Key", "STRING"),
        bq.SchemaField("Year", "INTEGER"),
        bq.SchemaField("Locationdesc", "STRING"),
        bq.SchemaField("Question", "STRING"),
        bq.SchemaField("Response", "STRING"),
        bq.SchemaField("Break_Out", "STRING"),
        bq.SchemaField("Break_Out_Category", "STRING"),
        bq.SchemaField("Sample_Size", "INTEGER"),
        bq.SchemaField("Data_value", "FLOAT"),
        bq.SchemaField("LocationID", "STRING"),
        bq.SchemaField("BreakoutID", "STRING"),
        bq.SchemaField("BreakOutCategoryID", "STRING"),
        bq.SchemaField("QuestionID", "STRING"),
    ]
    dataset = bq.Dataset("cvd-sp-de-zoomcamp-2024.cvd_dataset")
    dataset.location = "US"
    dataset = client.create_dataset(dataset, timeout=30)
    new_table = bq.Table(table_id, schema=schema)
    new_table = client.create_table(new_table) 

In [5]:
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('cvdPipeline') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar,./lib/spark-3.5-bigquery-0.37.0.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", keyLocation)

In [6]:
sc = SparkContext(conf=conf)
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", keyLocation)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

24/04/08 22:16:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/08 22:17:04 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
24/04/08 22:17:06 ERROR Inbox: Ignoring error
java.lang.NullPointerException: Cannot invoke "org.apache.spark.storage.BlockManagerId.executorId()" because "idWithoutTopologyInfo" is null
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$register(BlockManagerMasterEndpoint.scala:677)
	at org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEn

In [7]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()
spark.conf.set('temporaryGcsBucket', bucketName)

In [8]:
table_id = "cvd-sp-de-zoomcamp-2024.cvd_dataset.bq_output_table"
dfbq = spark.read.format('bigquery') \
    .option('table', table_id) \
    .option("credentialsFile", keyLocation) \
    .option("header", True) \
    .load()

In [9]:
colRead = ['Key', 'Year', 'Locationdesc', 'Question', 'Response', \
           'Break_Out', 'Break_Out_Category', 'Sample_Size', 'Data_value',  \
           'LocationID', 'BreakoutID', 'BreakOutCategoryID', 'QuestionID'
           ]

In [10]:
dfnew = spark.read \
    .option("header", "true") \
    .parquet('gs://cvd-bucket-de2024/cdc_data_' + inputYear + '.parquet') \
    .select(colRead)

                                                                                

In [11]:
#Drop rows where any filtered column values are NULL
dfnew = dfnew.dropna(how='any')

In [12]:
dfjoin = dfbq.union(dfnew)

In [13]:
dfpc = dfjoin.repartition(6, "Break_Out_Category")

In [14]:
#Review partitions made
dfpc.rdd.getNumPartitions()
dfpc.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show()

                                                                                

+-----------+-----+
|partitionId|count|
+-----------+-----+
|          0| 7718|
|          1| 6511|
|          2| 9970|
|          3|11008|
+-----------+-----+



In [15]:
dfpc.write.format("bigquery")\
    .option("temporaryGcsBucket",bucketName)\
    .option("credentialsFile", keyLocation) \
    .option("header", True) \
    .mode("overwrite")\
    .save(path=table_id)

24/04/08 22:18:49 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [16]:
spark.stop()