In [None]:
from google.cloud import storage
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import types
import os

os.environ['HADOOP_HOME'] = 'C:\\tools\\hadoop-3.2.0'

starting_file = "index.parquet"

files = ['demographics.parquet', 'economy.parquet', 'geography.parquet', 'hospitalizations.parquet', 'mobility.parquet', 
         'google-search-trends.parquet', 'lawatlas-emergency-declarations.parquet', 'epidemiology.parquet', 
         'health.parquet', 'vaccinations.parquet', 'weather.parquet', 'by-age.parquet', 'by-sex.parquet']

gcs_path = "gs://cv19-453102-bucket/raw/"



In [None]:
spark = SparkSession.getActiveSession()

if spark:
    spark.stop()
    print("Stopped session")
else:
    print("no")

In [None]:
credentials_location = r"C:\Users\nbwan\cv19pipeline\airflow\keys\my-creds.json"

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", r"C:\Users\nbwan\cv19pipeline\lib\gcs-connector3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [None]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

In [None]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [None]:
df_google_open_data = spark.read.parquet('gs://cv19-453102-bucket/raw/index.parquet')
df_google_open_data.show()

In [None]:
from pyspark.sql.functions import broadcast

def data_unification(df_google_open_data, files, gcs_path):
    """
    Reads multiple CSV files and joins them on 'location_key' into df_google_open_data.

    Args:
    - df_google_open_data (DataFrame): Initial DataFrame to join with.
    - files (list): List of CSV filenames.
    - gcs_path (str): Path to GCS where files are stored.

    Returns:
    - DataFrame: Unified DataFrame with all files joined.
    """
    
    for file in files:
        file_path = f"{gcs_path}{file}"
        
        # Read CSV with schema inference
        df = spark.read.parquet(file_path)

        # Ensure 'location_key' exists in both DataFrames
        if "location_key" not in df.columns:
            print(f"Skipping {file}: 'location_key' column missing.")
            continue

        # Optional: Broadcast smaller tables for performance
        if df.count() < 500000:  # Adjust threshold as needed
            df = broadcast(df)

        # Perform inner join
        df_google_open_data = df_google_open_data.join(df, on="location_key", how="inner")
        print(f"Inserted {file} into df_google_open_data")

    return df_google_open_data  # Ensure final DataFrame is returned


In [None]:
df_unified = data_unification(df_google_open_data, files, gcs_path)


In [None]:
df_unified.limit(10).show()

