## 1. Data Engineering - Process CSV files into BQ Tables via Spark

### Create Spark session with BQ connector

Create a Spark session

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, IntegerType, StructField, StructType

from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('Spark - Data Eng Demo') \
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest.jar') \
.getOrCreate()

**TODO**
* Provide your bucket path below


In [None]:
!gsutil ls <bucket-path>

Check the first 1000 bytes of a file on GCS

In [None]:
!gsutil cat -h -r 0-1000 <bucket-path>/transaction_data_train.csv

In [None]:
path_to_train_csv = "gs://<bucket-path>/transaction_data_train.csv"

### Get Spark application ID 

This is useful to easily fine application in the Spark History UI

In [None]:
spark.conf.get("spark.app.id")

Load the CSV file into a Spark Dataframe

In [None]:
df_transaction_data_from_csv = spark \
.read \
.option("inferSchema" , "true") \
.option("header" , "true") \
.csv(path_to_train_csv)

In [None]:
df_transaction_data_from_csv.printSchema()

### Create the BQ dataset & table to persist data

**TODO** (Challenge 1)
* Create the BQ schema from the spark dataframe 
    * Reference for converting data types: https://github.com/GoogleCloudDataproc/spark-bigquery-connector#data-types

In [None]:
schema_inline = <insert-code-here>
schema_inline

In [None]:
df_transaction_data_from_csv.show(5)

In [None]:
# create the name your BQ datasets
project_id = !gcloud config list --format 'value(core.project)' 2>/dev/null 
dataset_name_raw = project_id[0] + '-raw'
dataset_name_raw = dataset_name_raw.replace('-', '_')
dataset_name_raw

dataset_name_annotated = project_id[0] + '-annotated'
dataset_name_annotated = dataset_name_annotated.replace('-', '_')
dataset_name_annotated

dataset_name_enriched = project_id[0] + '-enriched '
dataset_name_enriched  = dataset_name_enriched.replace('-', '_')
dataset_name_enriched

Create the BQ dataset by specifying the location

In [None]:
!bq --location=europe-west3 mk -d \
{dataset_name_raw}

In [None]:
!bq --location=europe-west3 mk -d \
{dataset_name_annotated}

In [None]:
!bq --location=europe-west3 mk -d \
{dataset_name_enriched}

In [None]:
# create path to new table for creation
bq_table_name = 'transaction_data_train'
bq_table_path= dataset_name_raw + '.' + bq_table_name
bq_table_path

Create the BQ table 

In [None]:
!bq mk --table \
{bq_table_path} \
{schema_inline}

#### Check that table was created

In [None]:
table = project_id[0] + ":" + bq_table_path
df_transaction_data_from_bq = spark.read \
.format("bigquery") \
.option("table", table) \
.load()

In [None]:
df_transaction_data_from_bq.printSchema()

In [None]:
df_transaction_data_from_bq.show()

Write spark dataframe to BQ table

In [None]:
# create temp GCS bucket for writing spark df to bq table
gcs_bucket = project_id[0] + '-data'
gcs_bucket

In [None]:
df_transaction_data_from_csv.write \
.format("bigquery") \
.option("table", table) \
.option("temporaryGcsBucket", gcs_bucket) \
.mode('overwrite') \
.save()

Check if the BQ table is populated 

In [None]:
df_transaction_data_from_bq = spark.read \
.format("bigquery") \
.option("table", table) \
.load()

In [None]:
df_transaction_data_from_bq.show()

### Compute statistics for columns in table

In [None]:
df_transaction_data_from_bq.describe().show()