# Goal 

I will test BigQuery upload feature with Parquet file. (This parquet file is downloaded from https://github.com/Teradata/kylo/blob/master/samples/sample-data/parquet/userdata1.parquet)

BigQuery tries to detect schema information when to load Parquet format. 

I will show you how to make predefined schema before to load Parquet format. 

| column#		| column_name	 |	hive_datatype |
|-----------|--------------|----------------|
|1		|registration_dttm 	|timestamp|
|2		|id 			|int|
|3		|first_name 		|string|
|4		|last_name 		|string|
|5		|email 			|string|
|6		|gender 			|string|
|7		|ip_address 		|string|
|8		|cc 			|string|
|9		|country 		|string|
|10		|birthdate 		|string|
|11		|salary 			|double|
|12		|title 			|string|
|13		|comments 		|string|


In [16]:
#! pip3 install google.cloud
#! pip3 install google.cloud.storage
#! pip3 install google.cloud.bigquery

import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "<set your credential file path>"



In [22]:
# Load parquest to GCS

import os
from google.cloud import storage

# Create a bucket if it doesn't exist
bucket_name = "bigquery_parquet_upload_test_seoul"

storage_client = storage.Client()

bucket = storage_client.bucket(bucket_name)
if not bucket.exists():
    bucket.create(location="asia-northeast3")

# Upload a file to the bucket
file_name = "userdata1.parquet"
source_file_path = "../resources/" + file_name

blob = bucket.blob(file_name)
if not blob.exists():
  blob.upload_from_filename(source_file_path)
  print(f"File {file_name} uploaded to {bucket_name}.")
else:
  print(f"File {file_name} already uploaded.")
   


File userdata1.parquet uploaded to bigquery_parquet_upload_test_seoul.


## create table from loading.


In [23]:
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
table_id = "parquet_test.parquet_default"

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
)
uri = "gs://{bucket_name}/{file_name}".format(bucket_name=bucket_name, file_name=file_name)

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.


load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

Loaded 1000 rows.


After loading, the generated table shows the following schema.

| Field name | Type | Mode | 
|------------|------|------|
|registration_dttm	|TIMESTAMP	|NULLABLE |
|id	|INTEGER	|NULLABLE				|
|first_name|	STRING	|NULLABLE|				
|last_name|	STRING	|NULLABLE		|		
|email	|STRING	|NULLABLE				|
|gender	|STRING	|NULLABLE				|
|ip_address	|STRING	|NULLABLE		|		
|cc	|STRING	|NULLABLE				|
|country	|STRING	|NULLABLE	|			
|birthdate	|STRING	|NULLABLE|				
|salary	|FLOAT	|NULLABLE			|	
|title	|STRING	|NULLABLE			|	
|comments	|STRING	|NULLABLE		|

I will try to modify the mode of 'id' field to be 'NOT NULL' and the type of 'salary' from 'FLOAT' to 'STRING'


In [26]:
table_id = "parquet_test.parquet_modified"

# Method 1. After create table manually and load from parquet file.

results = client.query("""
create table {table_id} 
(
registration_dttm	TIMESTAMP,
id	INTEGER	NOT NULL				,
first_name	STRING,
last_name	STRING,
email	STRING,
gender	STRING,
ip_address	STRING,
cc	STRING,
country	STRING,
birthdate	STRING,
salary	STRING,
title	STRING,
comments	STRING
)
""".format(table_id=table_id)).result()



OK. You can upload the content from parquet file into this new table.

In [27]:
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
)
uri = "gs://{bucket_name}/{file_name}".format(bucket_name=bucket_name, file_name=file_name)

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  


load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

BadRequest: 400 Provided Schema does not match Table turnkey-charter-358922:parquet_test.parquet_modified. Field salary has changed type from STRING to FLOAT

Wow. it shows error!!

We need to specify configuration file into LoadJobConfig operation. 

https://stackoverflow.com/questions/60230068/error-field-x-has-changed-type-from-numeric-to-float-when-inserting-data-to-b

In [38]:
my_schema = [
  {
    "name": "registration_dttm",
    "type": "TIMESTAMP"
  },{
    "name": "id",
    "type": "INTEGER",
    "mode" : "REQUIRED"
  },{
    "name": "first_name",
    "type": "STRING"
  },{
    "name": "last_name",
    "type": "STRING"
  },{
    "name": "email",
    "type": "STRING"
  },{
    "name": "gender",
    "type": "STRING"
  },{
    "name": "ip_address",
    "type": "STRING"
  },{
    "name": "cc",
    "type": "STRING"
  },{
    "name": "country",
    "type": "STRING"
  },{
    "name": "birthdate",
    "type": "STRING"
  },{
    "name": "salary",
    "type": "NUMERIC"
  },{
    "name": "title",
    "type": "STRING"
  },{
    "name": "comments",
    "type": "STRING"
  }
]

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    autodetect=False,
    #decimal_target_types=["STRING"],
    schema=my_schema
)
uri = "gs://{bucket_name}/{file_name}".format(bucket_name=bucket_name, file_name=file_name)

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  


load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))


BadRequest: 400 Provided Schema does not match Table turnkey-charter-358922:parquet_test.parquet_modified. Field salary has changed type from STRING to NUMERIC

Wow. Numerical Column can't be converted into String directly. 

OK. The 'type' of 'salary' would be 'NUMERIC' type instead of 'DOUBLE'

In [44]:
table_id = "parquet_test.parquet_numeric"

my_schema = [
  {
    "name": "registration_dttm",
    "type": "TIMESTAMP"
  },{
    "name": "id",
    "type": "INTEGER",
    "mode" : "REQUIRED"
  },{
    "name": "first_name",
    "type": "STRING"
  },{
    "name": "last_name",
    "type": "STRING"
  },{
    "name": "email",
    "type": "STRING"
  },{
    "name": "gender",
    "type": "STRING"
  },{
    "name": "ip_address",
    "type": "STRING"
  },{
    "name": "cc",
    "type": "STRING"
  },{
    "name": "country",
    "type": "STRING"
  },{
    "name": "birthdate",
    "type": "STRING"
  },{
    "name": "salary",
    "type": "BIGNUMERIC"
  },{
    "name": "title",
    "type": "STRING"
  },{
    "name": "comments",
    "type": "STRING"
  }
]

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    autodetect=False,
    #decimal_target_types=["STRING"],
    schema=my_schema
)
uri = "gs://{bucket_name}/{file_name}".format(bucket_name=bucket_name, file_name=file_name)

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  


load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

BadRequest: 400 Error while reading data, error message: Reading column 'salary' of type DOUBLE as BIGNUMERIC type but its logical type is NONE, expecting DECIMAL. File: gs://bigquery_parquet_upload_test_seoul/userdata1.parquet

Wow.. It can't be. 

Infered Column couldn't be configured with LoadJobConfig options. 
In the reference document(https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet#type_conversions), it describe the conversion rule. 

Only 'mode' filed could be changed from 'NULLABLE' to 'REQUIRED'. 
