# GCP Pipeline

## In essence, the pipeline goes like this:
## Platform data -> Uploaded to an URL -> Cloud Function reads the file from URL and executes the code to ingest it -> The file is stored in GCS -> It is unzipped -> Exported to BQ as a table under the respective dataset name.

-------

## The respective platform drops data (name:Self_active_subscribers2022-06-13T23_00_00.000Z.zip) onto the Cloud Function URL, this URL is assigned as the trigger for the function to start working. 

## The file (zip) is uploaded to the bucket called "platform_daily_active_file", this is performed by code #1.

## However, the zip file needs to be unzipped to be ingested. The code #2 does exactly that.

## The #3 part of the code, defines a schema in BigQuery and uploads the unzipped file as a BQ table.

-----------------

## #1

In [None]:
# Function dependencies, for example:
# package>=version
google-cloud-storage
requests

In [None]:
from google.cloud import storage
import requests

def csg_upload_subscriber(request) :
    request_json = request.get_json(silent=True)
        
    export_location = request_json["ExportLocation"]
    name = "Self_active_subscribers" + request_json["StartDate"] + ".zip"
    
    project_id = 'xx-x-xxx'
    client = storage.Client(project=project_id)
    bucket = client.get_bucket('platform_daily_active_file')

    CSG_File = requests.get(export_location)

    blob = bucket.blob(name)
    blob.upload_from_string(CSG_File.content, "application/zip")

## #2

In [None]:
# requirements.txt
# Function dependencies, for example:
# package>=version
google-cloud-storage
zipfile36
regex
pandas-io
pyzipper
pandas

In [None]:
from google.cloud import storage
from zipfile36 import ZipFile
from zipfile36 import is_zipfile
import pyzipper
import regex as re
import io
import pandas as pd
import csv


def unzip_file(
    event,
    context,
    ):
    
    print("here")
    print(event)
    print(event["name"])

    client2 = storage.Client()

    file = event
    bucket = client2.get_bucket("platform_daily_active_file")
    source_file = file["name"]
    metaGeneration = file["metageneration"]
    contentType = file["contentType"]

    if metaGeneration == "1" and contentType == "application/zip":
        blob = bucket.get_blob(source_file)
        zipbytes = io.BytesIO(blob.download_as_string())
        if is_zipfile(zipbytes):
            with pyzipper.AESZipFile(zipbytes, "r") as zip:
                zip.pwd = b"pass@123"
                fname = zip.namelist()[0]
                imgdata = zip.read(fname)
                text = imgdata.decode("utf-8")

                myreader = csv.reader(text.splitlines())
                list = []
                for row in myreader:
                    list.append(row[0:])
                df = pd.DataFrame(list)
                headers = df.iloc[0]
                df_final = pd.DataFrame(list[1:], columns=headers)

                blob = bucket.blob(fname)
                blob.upload_from_string(df_final.to_csv(index=False), 'text/csv')
                

## #3

In [None]:
# Function dependencies, for example:
# package>=version
google-cloud-bigquery
google-cloud-storage

In [None]:
#!/usr/bin/python
# -*- coding: utf-8 -*-
from google.cloud import bigquery

def  csg_subscriber_upload(event, context):
    print("step 1")
    file = event
    print (file)
    source_file = str(file['name'])
    print (source_file)
    uri = 'gs://platform_daily_active_file/' + source_file
    contentType = file['contentType']

    if (contentType == 'text/csv'):
        print("step 1")
        dataset = 'client_platform'
        table = 'Platform_Subscribers_Info' + source_file.split('_')[1] \
            + source_file.split('_')[2] + source_file.split('_')[3]
        print (table)
        bigquery_client = bigquery.Client()
        dataset_ref = bigquery_client.dataset(dataset)
        job_config = bigquery.LoadJobConfig()
        job_config.autodetect = True
        #job_config.schema_update_options = \
            #[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION]
        job_config.create_disposition = \
            bigquery.job.CreateDisposition.CREATE_IF_NEEDED
        job_config.write_disposition = \
            bigquery.WriteDisposition.WRITE_TRUNCATE
        load_job = bigquery_client.load_table_from_uri(uri,
                dataset_ref.table(table), job_config=job_config)
        load_job.result()
        destination_table = \
            bigquery_client.get_table(dataset_ref.table(table))
        print (file['name'])
        print ('Loaded {} rows.'.format(destination_table.num_rows))
        print ('Data uploaded into BigQuery table successfully.')

    else:
        print ('Not a CSV File')
