In [16]:
# import findspark
# from pyspark.sql import SparkSession
# import pyspark.sql.functions as F
# import pyspark.sql.types as T
# import os

In [17]:
# findspark.init()
# spark = (SparkSession
#         .builder
#         .appName("transformations")
#         .master("local[*]")
#         .getOrCreate()
#         )

In [18]:
"""
Create a bucket (if not exists) in Google Cloud Storage
Download JSON file in bucket (created in preceding step)
"""

import requests
from google.cloud import storage
import json
import io

API_URL = "https://api.covidtracking.com/v2/us/daily.json"
GCP_BUCKET_NAME = "covid19data_ss"
BQ_WRITE_BUCKET_NAME = "covid19data_ss_bq_write"
DATA_DESTINATION_NAME = "covidtrackingdata.json"
SERVICE_ACCOUNT_JSON_PATH = "/home/infernape/gcp-projects/covid19datapipeline/service_account_secrets.json"
PROJECT_ID = "sunlit-vortex-394519"
client = storage.Client.from_service_account_json(SERVICE_ACCOUNT_JSON_PATH)

# Create the bucket if it doesn't exist
def create_bucket_if_not_exists(bucket_name, project_id):
    try:
        client.get_bucket(bucket_name)
        print(f"Bucket {bucket_name} already exists.")
    except Exception as e:
        bucket = client.create_bucket(bucket_name, project=project_id)
        print(f"Bucket {bucket_name} created.")

create_bucket_if_not_exists(GCP_BUCKET_NAME, PROJECT_ID)
create_bucket_if_not_exists(BQ_WRITE_BUCKET_NAME, PROJECT_ID)

# Fetch JSON data from the API
response = requests.get(API_URL)
response.raise_for_status()
data = response.json()["data"]
json_data = json.dumps(data)

# Create a bytes stream and write the JSON string to it
bytes_stream = io.BytesIO()
bytes_stream.write(json_data.encode('utf-8'))
bytes_stream.seek(0)

# Initialize the GCS client with service account credentials
bucket = client.get_bucket(GCP_BUCKET_NAME)
blob = bucket.blob(DATA_DESTINATION_NAME)

# delete JSON file if already exists
if blob.exists():
    blob.delete()
    print(f"Blob {DATA_DESTINATION_NAME} deleted from {GCP_BUCKET_NAME}.")

# Save the JSON data to GCS
blob.upload_from_file(bytes_stream, content_type='application/json')

print(f"JSON data saved to gs://{GCP_BUCKET_NAME}/{DATA_DESTINATION_NAME}")

Bucket covid19data_ss already exists.
Bucket covid19data_ss_bq_write already exists.
JSON data saved to gs://covid19data_ss/covidtrackingdata.json


In [None]:
# stop the dataproc cluster



In [10]:
# """
# Read downloaded JSON file from GCS bucket
# Convert into Spark dataframe and perform transformations
# """

# # JSON_FILE_PATH = "/home/infernape/gcp-projects/covid19datapipeline/test/covidtrackingdata.json"

# # df = spark.read.json(JSON_FILE_PATH)

# INPUT_FILE = "/home/infernape/gcp-projects/covid19datapipeline/test/test.txt"
# OUTPUT_FILE = "/home/infernape/gcp-projects/covid19datapipeline/test/covidtrackingdata.json"

# rdd = spark.sparkContext.textFile(INPUT_FILE)

# def replace_content(line):
#     line = line.replace("'", '"')  # Convert single quotes to double quotes
#     line = line.replace(": None", ": null")  # Convert None to null
#     return line

# processed_rdd = rdd.map(replace_content)
# df = spark.read.json(processed_rdd)
# df.show(1)
# # df.write.json(OUTPUT_FILE)

In [11]:
# from google.cloud import dataproc_v1 as dataproc
# from google.cloud import storage
# from time import sleep

# # TODO: Replace these with your values
# PROJECT_ID = 'sunlit-vortex-394519'
# REGION = 'us-central1'
# CLUSTER_NAME = 'covidprocess'
# GCS_BUCKET = 'covid19data_ss'
# GCS_INPUT_PATH = f'gs://{GCS_BUCKET}/covidtrackingdata.txt'
# GCS_OUTPUT_PATH = f'gs://{GCS_BUCKET}/output.json'
# SERVICE_ACCOUNT_JSON_PATH = 'path_to_service_account_key.json'

# # Initialize Dataproc and Storage clients
# cluster_client = dataproc.ClusterControllerClient()
# job_client = dataproc.JobControllerClient()

# # Create cluster config
# cluster_config = {
#     'project_id': PROJECT_ID,
#     'cluster_name': CLUSTER_NAME,
#     'config': {
#         'master_config': {
#             'num_instances': 1,
#             'machine_type_uri': 'n1-standard-1'
#         },
#         'worker_config': {
#             'num_instances': 0
#         },
#         'gce_cluster_config': {
#             'service_account_scopes': [
#                 'https://www.googleapis.com/auth/cloud-platform'
#             ]
#         }
#     }
# }

# # Create cluster
# print("Creating cluster...")
# cluster = cluster_client.create_cluster(PROJECT_ID, REGION, cluster_config)
# cluster_id = cluster.cluster_uuid

# # Ensure cluster is up before submitting a job
# while True:
#     cluster_info = cluster_client.get_cluster(PROJECT_ID, REGION, CLUSTER_NAME)
#     if cluster_info.status.state.name == 'RUNNING':
#         break
#     sleep(10)

# # PySpark script to process data
# pyspark_script = f"""
# from pyspark.sql import SparkSession

# spark = SparkSession.builder.appName("ProcessingData").getOrCreate()

# # Read data
# rdd = spark.sparkContext.textFile("{GCS_INPUT_PATH}")

# # Process data
# processed_rdd = rdd.map(lambda x: x.replace("'", '"').replace("None", "null"))

# # Convert RDD to DataFrame (assuming each line is valid JSON after transformation)
# df = spark.read.json(processed_rdd)

# # Save as JSON
# df.write.json("{GCS_OUTPUT_PATH}")

# spark.stop()
# """

# # Save PySpark script to GCS
# gcs_client = storage.Client.from_service_account_json(SERVICE_ACCOUNT_JSON_PATH)
# bucket = gcs_client.bucket(GCS_BUCKET)
# blob = bucket.blob('script/process_data.py')
# blob.upload_from_string(pyspark_script)

# # Submit job to Dataproc
# print("Submitting job...")
# job_config = {
#     'placement': {
#         'cluster_name': CLUSTER_NAME
#     },
#     'pyspark_job': {
#         'main_python_file_uri': f'gs://{GCS_BUCKET}/script/process_data.py'
#     }
# }
# job = job_client.submit_job_as_operation(PROJECT_ID, REGION, job_config)
# job_id = job.name

# # Wait for job completion
# while True:
#     job_info = job_client.get_job(PROJECT_ID, REGION, job_id)
#     if job_info.status.state.name == 'DONE':
#         break
#     sleep(10)

# # Delete the cluster
# print("Deleting cluster...")
# cluster_client.delete_cluster(PROJECT_ID, REGION, CLUSTER_NAME)
# print("Cluster deleted.")


In [12]:
# import json
# INPUT_FILE = "/home/infernape/gcp-projects/covid19datapipeline/test/test.txt"
# with open(INPUT_FILE, 'r') as file:
#      file_content = file.read()
        
# file_content_trnsfm = file_content.replace("'",'"').replace(": None", ": null")
# # print(file_content_trnsfm)

# data = json.loads(file_content_trnsfm)

# with open('output_file.json', 'w') as outfile:
#     json.dump(data, outfile)

In [15]:
# from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# schema = StructType([
#     StructField("date", StringType(), True),
#     StructField("states", IntegerType(), True),
#     StructField("cases", StructType([
#         StructField("total", StructType([
#             StructField("value", IntegerType(), True),
#             StructField("calculated", StructType([
#                 StructField("population_percent", DoubleType(), True),
#                 StructField("change_from_prior_day", IntegerType(), True),
#                 StructField("seven_day_change_percent", DoubleType(), True)
#             ]))
#         ]))
#     ])),
#     StructField("testing", StructType([
#         StructField("total", StructType([
#             StructField("value", IntegerType(), True),
#             StructField("calculated", StructType([
#                 StructField("population_percent", DoubleType(), True),
#                 StructField("change_from_prior_day", IntegerType(), True),
#                 StructField("seven_day_change_percent", DoubleType(), True)
#             ]))
#         ]))
#     ])),
#     StructField("outcomes", StructType([
#         StructField("hospitalized", StructType([
#             StructField("currently", StructType([
#                 StructField("value", IntegerType(), True),
#                 StructField("calculated", StructType([
#                     StructField("population_percent", DoubleType(), True),
#                     StructField("change_from_prior_day", IntegerType(), True),
#                     StructField("seven_day_change_percent", DoubleType(), True),
#                     StructField("seven_day_average", IntegerType(), True)
#                 ]))
#             ])),
#             StructField("in_icu", StructType([
#                 StructField("currently", StructType([
#                     StructField("value", IntegerType(), True),
#                     StructField("calculated", StructType([
#                         StructField("population_percent", DoubleType(), True),
#                         StructField("change_from_prior_day", IntegerType(), True),
#                         StructField("seven_day_change_percent", DoubleType(), True),
#                         StructField("seven_day_average", IntegerType(), True)
#                     ]))
#                 ]))
#             ])),
#             StructField("on_ventilator", StructType([
#                 StructField("currently", StructType([
#                     StructField("value", IntegerType(), True),
#                     StructField("calculated", StructType([
#                         StructField("population_percent", DoubleType(), True),
#                         StructField("change_from_prior_day", IntegerType(), True),
#                         StructField("seven_day_change_percent", DoubleType(), True),
#                         StructField("seven_day_average", IntegerType(), True)
#                     ]))
#                 ]))
#             ]))
#         ])),
#         StructField("death", StructType([
#             StructField("total", StructType([
#                 StructField("value", IntegerType(), True),
#                 StructField("calculated", StructType([
#                     StructField("population_percent", DoubleType(), True),
#                     StructField("change_from_prior_day", IntegerType(), True),
#                     StructField("seven_day_change_percent", DoubleType(), True),
#                     StructField("seven_day_average", IntegerType(), True)
#                 ]))
#             ]))
#         ]))
#     ]))
# ])

In [14]:
# df = spark.read.json("/home/infernape/gcp-projects/covid19datapipeline/test/test.json", schema = schema)