## **Loading Data to BigQuery**

* In this notebook below operatins are done :

1. setting up environment - creating gcs buckets, bigquery datasets.
2. Loading data to BQ Bronze layer.


<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/smvinodkumar910/market-mirror/blob/main/backend/01_load_data.ipynb">
      <img width="32px" src="https://www.gstatic.com/pantheon/images/bigquery/welcome_page/colab-logo.svg" alt="Google Colaboratory logo"><br> Run in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2Fsmvinodkumar910%2Fmarket-mirror%2Frefs%2Fheads%2Fmain%2Fbackend%2F01_load_data.ipynb">
      <img width="32px" src="https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN" alt="Google Cloud Colab Enterprise logo"><br> Run in Colab Enterprise
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/smvinodkumar910/market-mirror/refs/heads/main/backend/01_load_data.ipynb">
      <img src="https://www.gstatic.com/images/branding/gcpiconscolors/vertexai/v1/32px.svg" alt="Vertex AI logo"><br> Open in Vertex AI Workbench
    </a>
  </td>    
  <td style="text-align: center">
    <a href="https://github.com/smvinodkumar910/market-mirror/blob/main/backend/01_load_data.ipynb">
      <img width="32px" src="https://www.svgrepo.com/download/475654/github-color.svg" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
</table>

### Authenticate your notebook environment (Colab only)

If you are running this notebook on Google Colab, run the following cell to authenticate your environment.


In [None]:
import sys

if "google.colab" in sys.modules:
    # Support for third party widgets
    from google.colab import auth, output

    auth.authenticate_user()
    output.enable_custom_widget_manager()

### Setting-up Environment

* Please change the variables `PROJECT_ID`, `BUCKET_NAME`, `LOCATION` details to your own project as required.

* In this project we are following Medallion architecture to load and proeceess data in BQ datawarehouse.

* Hence 3 BQ datasets defined below, namely `BQ_BRONZE_DATASET`, `BQ_SILVER_DATASET` and `BQ_GOLD_DATASET`. 

* You can leave it as its to load and process data in respective datasets, or you can change if you preferent different name for the datasets.

In [None]:
import os

PROJECT_ID = "market-mirror-dev"  # @param {type: "string", placeholder: "[your-project-id]", isTemplate: true}
BUCKET_NAME = "marke-mirror-dev-data"  # @param {type: "string", placeholder: "[your-bucket-name]", isTemplate: true}
LOCATION = "us-central1"  # @param {type: "string", placeholder: "[your-region]", isTemplate: true}
if not PROJECT_ID or PROJECT_ID == "[your-project-id]":
    PROJECT_ID = str(os.environ.get("GOOGLE_CLOUD_PROJECT"))

if not LOCATION or LOCATION == "[your-region]":
    LOCATION = os.environ.get("GOOGLE_CLOUD_REGION", "us-central1")


In [None]:
BQ_BRONZE_DATASET = "APP_MARKET_BRONZE" # @param {type: "string", placeholder: "[bronze-dataset]", isTemplate: true}
BQ_SILVER_DATASET = "APP_MARKET_SILVER" # @param {type: "string", placeholder: "[silver-dataset]", isTemplate: true}
BQ_GOLD_DATASET = "APP_MARKET_GOLD" # @param {type: "string", placeholder: "[gold-dataset]", isTemplate: true}

#### Prepare GCS

* In this section we are creating GCS bucket required to Upload RAW files.

In [None]:
from google.cloud import storage
from google.cloud.exceptions import NotFound

gcs_client = storage.Client(
    project=PROJECT_ID
)

try:
  databucket = gcs_client.get_bucket(BUCKET_NAME)
  bucket_exists = True
except NotFound:
  databucket = gcs_client.create_bucket(BUCKET_NAME, project=PROJECT_ID)
  bucket_exists = True

#### Prepare BigQuery Datasets

* In this section we are creating required datasets in BQ.

In [None]:
from google.cloud import bigquery

bq_client = bigquery.Client(project=PROJECT_ID)

bq_client.create_dataset(BQ_BRONZE_DATASET,exists_ok=True)
bq_client.create_dataset(BQ_SILVER_DATASET,exists_ok=True)
bq_client.create_dataset(BQ_GOLD_DATASET,exists_ok=True)

### Data Load Steps

#### Data Definitions

We are going to use 5 Kaggle Datasets for this project.

**Reviews Dataset :**
* This dataset will be used to explore the sentiment of user reviews on Google Play Store Apps. 
1. https://www.kaggle.com/datasets/lava18/google-play-store-apps
2. https://www.kaggle.com/datasets/marianna13/google-play-reviews

**Product Details Dataset:**
* This dataset will be used to compare the products in Goole Play store with competitive products in other platforms like Apple and Windows.
1. https://www.kaggle.com/datasets/maryamsayagh1/google-play-store-apps
2. https://www.kaggle.com/datasets/ramamet4/app-store-apple-data-set-10k-apps
3. https://www.kaggle.com/datasets/quadeer15sh/windows-store-top-apps-games


#### Download Data from Kaggle

* In this section, we are downloading data from Kaggle datasets.

In [None]:
import kagglehub
import os, glob

product_datasets_list = [
'https://www.kaggle.com/datasets/maryamsayagh1/google-play-store-apps',
'https://www.kaggle.com/datasets/ramamet4/app-store-apple-data-set-10k-apps',
'https://www.kaggle.com/datasets/quadeer15sh/windows-store-top-apps-games']


reviews_datasets_list = ['https://www.kaggle.com/datasets/lava18/google-play-store-apps',
'https://www.kaggle.com/datasets/marianna13/google-play-reviews']

* Below step downloads all review datasets from Kaggle to local path.

In [None]:
review_local_paths = []
for dataset in reviews_datasets_list:
  dataset_path = dataset.replace('https://www.kaggle.com/datasets/','')
  dataset_name = dataset.split('/')[-1]
  path = kagglehub.dataset_download(dataset_path)
  files_path = glob.glob(pathname=os.path.join(path,'*'))
  review_local_paths.append({'dataset_name': dataset_name, 'path':files_path})

* Below step downloads all product datasets from Kaggle to local path.

In [None]:
product_local_paths = []
for dataset in product_datasets_list:
  dataset_path = dataset.replace('https://www.kaggle.com/datasets/','')
  dataset_name = dataset.split('/')[-1]
  path = kagglehub.dataset_download(dataset_path)
  files_path = glob.glob(pathname=os.path.join(path,'*'))
  product_local_paths.append({'dataset_name': dataset_name, 'path':files_path})

#### Upload Data to GCS

* Below step uploads the files stored in local filesystem to GCS.

In [None]:
#Uploading reivew datasets
review_gcs_files = []
if bucket_exists:
  for file in review_local_paths:
    dataset_name = file.get('dataset_name')
    paths = file.get('path')
    for path in paths:
      file_name = path.split('/')[-1]
      destination_blob_name = os.path.join('review_dataset',dataset_name,file_name)
      destination_blob = databucket.blob(destination_blob_name)
      destination_blob.upload_from_filename(path)
      review_gcs_files.append(f"gs://{BUCKET_NAME}/{destination_blob_name}")
      print(
          f"File {path} uploaded to gs://{BUCKET_NAME}/{destination_blob_name}."
      )

In [None]:
#Uploading product datasets
product_gcs_files = []
if bucket_exists:
  for file in product_local_paths:
    dataset_name = file.get('dataset_name')
    paths = file.get('path')
    for path in paths:
      file_name = path.split('/')[-1]
      destination_blob_name = os.path.join('product_dataset',dataset_name,file_name)
      destination_blob = databucket.blob(destination_blob_name)
      destination_blob.upload_from_filename(path)
      product_gcs_files.append(f"gs://{BUCKET_NAME}/{destination_blob_name}")
      print(
          f"File {path} uploaded to gs://{BUCKET_NAME}/{destination_blob_name}."
      )

#### Write Data to BigQuery Bronze Layer

* In this step we are loading data from GCS Bucket to BQ using Bigframes.

In [None]:
import bigframes.pandas as bpd

bpd.options.bigquery.project = PROJECT_ID
bpd.options.bigquery.dataset = BQ_BRONZE_DATASET

* Filtering reivew files with .csv files only and loading in BQ Bronze layer

In [None]:
review_gcs_files_filtered = [{'file_name':file.split('/')[-1].split('.')[0], 'gcs_path': file} for file in review_gcs_files if (file.endswith('.csv')  and 'review' in file.split('/')[-1].split('.')[0] ) ]

In [None]:
#start processing files
for file_dtl in review_gcs_files_filtered:
  df = bpd.read_csv(file_dtl.get('gcs_path'))
  df.to_gbq(f'{BQ_BRONZE_DATASET}.{file_dtl.get("file_name")}', if_exists='replace')


* Filtering product data files with .csv extension and loading into BQ Bronze layer with dataproc serverless.

In [None]:
product_gcs_files_filtered = [{'file_name':file.split('/')[-1].split('.')[0], 'gcs_path': file} for file in product_gcs_files if file.endswith('.csv') ]

In [None]:
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session


# This will create a default Spark session
spark = DataprocSparkSession.builder.getOrCreate()


In [None]:
for file in product_gcs_files_filtered:
  table_name = file.get('file_name')
  print(table_name)
  df = spark.read\
  .option("multiLine", "true")\
  .option("quote", "\"")\
  .option("escape", '"')\
  .csv(file.get('gcs_path'),
        inferSchema=True,
        header=True)
  col_rename = [{f"{column}":f"{column.replace(' ','_').replace('.','_')}"} for column in df.columns]
  all_col_rename = dict()
  for a in col_rename:
    all_col_rename.update(a)
  df = df.withColumnsRenamed(all_col_rename)
  df.write.mode("overwrite").format('bigquery').save(f'{PROJECT_ID}.{BQ_BRONZE_DATASET}.{table_name}')
