Google Big Query is a distributed data warehouse built on a serverless architecture . We’ll discuss this framework in class. In this task you’ll upload all Wedge transaction records to Google Big Query. You’ll want to make sure that the column data types are correctly specified and you’ve properly handled the null values. 
The requirements for this task change depending on the grade you’re going for. 
Note: this assignment can be done manually or programmatically. Naturally I’d prefer it be done programmatically so that you get more practice, but that’s not required to get full credit.

1. Clean the data
    a. I need to split on the delimiter
    b. check for a header (and add it if it doesn't have one)
    c. fix the \\N and \N  and NULL values - keep as NULL
    d. Split them into single month dataframes
2. Upload to GBQ
    a. Upload each one as a separate table in a new dataset in my GBQ project


In [1]:
import os
import pandas as pd
import zipfile
from zipfile import ZipFile
import csv
import io
from io import TextIOWrapper
from pandas_gbq import to_gbq
from google.oauth2 import service_account
from google.cloud import bigquery

In [6]:
zip_files = os.listdir("WedgeZipOfZips")

In [3]:
# JSON key stuff
service_path = "C:/Users/vanes/OneDrive/Desktop/Work/MSBA/ADA/wedge_project/"
service_file = 'wedge-project-vw-key.json' # change this to your authentication information  

# The project ID that I created in GBQ 
gbq_proj_id = 'wedge-project-vw'  

# Leave this alone because it's the whole key to my GBQ proj
private_key = service_path + service_file

# Lets me actually acess all my junk 
credentials = service_account.Credentials.from_service_account_file(service_path + service_file)

# And finally we establish our connection
client = bigquery.Client(credentials = credentials, project=gbq_proj_id)

# Tell it where the Zips are located
zip_dir = 'WedgeZipOfZips'

# The pieces needed for the GBQ upload so it goes to the right place
project_id = 'wedge-project-vw'
dataset_id = 'wedge_data'

In [4]:
delimiters = dict()

# Start by reading in all the files again.
for this_zf in zip_files:
    with ZipFile("WedgeZipOfZips/" + this_zf, 'r') as zf:
        zipped_files = zf.namelist()

        for file_name in zipped_files:
            input_file = zf.open(file_name, 'r')
            input_file = io.TextIOWrapper(input_file, encoding="utf-8")

            # Read the first line to detect the delimiter
            first_line = input_file.readline()
            dialect = csv.Sniffer().sniff(sample=first_line, delimiters=[",", ";", "\t"])
            detected_delimiter = dialect.delimiter

            # Check if the detected delimiter is different from ","
            if detected_delimiter != ",":
                # Change the delimiter to ","
                delimiters[file_name] = ","
            else:
                delimiters[file_name] = detected_delimiter

            # Reset the file back to the beginning for further processing
            input_file.seek(0)

            # Now, you can process the file using the appropriate delimiter
            for line in input_file:
                # Process the data rows here
                data = line.strip().split(delimiters[file_name])
                

            print(f"{file_name}: has delimiter: {detected_delimiter}")
            input_file.close()  # tidy up

transArchive_201001_201003.csv: has delimiter: ,
transArchive_201004_201006.csv: has delimiter: ,
transArchive_201007_201009.csv: has delimiter: ,
transArchive_201010_201012.csv: has delimiter: ,
transArchive_201101_201103.csv: has delimiter: ,
transArchive_201104.csv: has delimiter: ,
transArchive_201105.csv: has delimiter: ,
transArchive_201106.csv: has delimiter: ,
transArchive_201107_201109.csv: has delimiter: ,
transArchive_201110_201112.csv: has delimiter: ,
transArchive_201201_201203.csv: has delimiter: ,
transArchive_201201_201203_inactive.csv: has delimiter: ;
transArchive_201204_201206.csv: has delimiter: ,
transArchive_201204_201206_inactive.csv: has delimiter: ;
transArchive_201207_201209.csv: has delimiter: ,
transArchive_201207_201209_inactive.csv: has delimiter: ;
transArchive_201210_201212.csv: has delimiter: ,
transArchive_201210_201212_inactive.csv: has delimiter: ;
transArchive_201301_201303.csv: has delimiter: ,
transArchive_201301_201303_inactive.csv: has delimiter

In [14]:
for this_zf in zip_files :
    with ZipFile("WedgeZipOfZips/" + this_zf,'r') as zf :
        zipped_files = zf.namelist()

        for file_name in zipped_files :
            input_file = zf.open(file_name,'r')
            input_file = io.TextIOWrapper(input_file,encoding="utf-8")
            
            this_delimiter = delimiters[file_name]
            
            for line in input_file :
                print(line.strip().split(this_delimiter))
              #  break

            input_file.close() # tidy up

In [8]:
headers = dict()

for this_zf in zip_files:
    with ZipFile("WedgeZipOfZips/" + this_zf, 'r') as zf:
        zipped_files = zf.namelist()

        for file_name in zipped_files:
            input_file = zf.open(file_name, 'r')
            input_file = io.TextIOWrapper(input_file, encoding="utf-8")

            this_delimiter = delimiters[file_name]

            # Read the first line to check for the header row
            first_line = input_file.readline()

            # Check if the first line is a header row (you can customize this check)
            is_header = any(keyword in first_line for keyword in ['datetime', 'register_no', 'description', 'trans_status', 'quantity'])

            headers[file_name] = is_header

            print(f"File: {file_name}, Has Header: {is_header}")

            input_file.close()  # tidy up



File: transArchive_201001_201003.csv, Has Header: True
File: transArchive_201004_201006.csv, Has Header: True
File: transArchive_201007_201009.csv, Has Header: True
File: transArchive_201010_201012.csv, Has Header: True
File: transArchive_201101_201103.csv, Has Header: True
File: transArchive_201104.csv, Has Header: True
File: transArchive_201105.csv, Has Header: True
File: transArchive_201106.csv, Has Header: True
File: transArchive_201107_201109.csv, Has Header: True
File: transArchive_201110_201112.csv, Has Header: True
File: transArchive_201201_201203.csv, Has Header: True
File: transArchive_201201_201203_inactive.csv, Has Header: True
File: transArchive_201204_201206.csv, Has Header: True
File: transArchive_201204_201206_inactive.csv, Has Header: True
File: transArchive_201207_201209.csv, Has Header: True
File: transArchive_201207_201209_inactive.csv, Has Header: True
File: transArchive_201210_201212.csv, Has Header: True
File: transArchive_201210_201212_inactive.csv, Has Header: 

In [4]:
# Define your Google BigQuery project and dataset information
project_id = 'wedge-project-vw'
dataset_id = 'wedge_data'

# Initialize the BigQuery client
#client = bigquery.Client(project=project_id)

# List of headers you want to add (modify as needed)
new_headers = ["datetime", "register_no", "emp_no", "trans_no", "upc", "description", "trans_type", "trans_subtype", "trans_status", "department", "quantity", "Scale", "cost", "unitPrice", "total", "regPrice", "altPrice", "tax", "taxexempt", "foodstamp", "wicable", "discount", "memDiscount", "discountable", "discounttype", "voided", "percentDiscount", "ItemQtty", "volDiscType", "volume", "VolSpecial", "mixMatch", "matched", "memType", "staff", "numflag", "itemstatus", "tenderstatus", "charflag", "varflag", "batchHeaderID", "local", "organic", "display", "receipt", "card_no", "store", "branch", "match_id", "trans_id"]

# Store the modified files for BigQuery upload
modified_files = []

# Define the delimiters dictionary
delimiters = {}

# Start by reading in all the files again.
for this_zf in zip_files:
    with ZipFile("WedgeZipOfZips/" + this_zf, 'r') as zf:
        zipped_files = zf.namelist()

        for file_name in zipped_files:
            input_file = zf.open(file_name, 'r')
            input_file = io.TextIOWrapper(input_file, encoding="utf-8")

            # Read the first line to detect the delimiter
            first_line = input_file.readline()
            dialect = csv.Sniffer().sniff(sample=first_line, delimiters=[",", ";", "\t"])
            detected_delimiter = dialect.delimiter

            # Check if the detected delimiter is different from ","
            if detected_delimiter != ",":
                # Change the delimiter to ","
                delimiters[file_name] = ","
            else:
                delimiters[file_name] = detected_delimiter

            # Reset the file back to the beginning for further processing
            input_file.seek(0)

            # Read the first line to check for the header row
            first_line = input_file.readline()
            is_header = any(keyword in first_line for keyword in new_headers)

            if not is_header:
                # If header is missing, add the new headers
                modified_file = ["",""] + new_headers + ["\n"]
                modified_file.append(first_line)
            else:
                modified_file = []

            # Now, you can process the rest of the file, adding or replacing values as needed
            this_delimiter = delimiters[file_name]
            for line in input_file:
                line = line.strip()
                if r'\\N' in line or r'\N' in line:
                    line = line.replace(r'\\N', 'NULL').replace(r'\N', 'NULL')
                modified_file.append(line)
                modified_file.append("\n")

            input_file.close()  # tidy up

            modified_files.append((file_name, modified_file))

# Now you have a list of modified files to upload to BigQuery
for file_name, modified_file in modified_files:
    # Upload the modified file to Google BigQuery
    table_id = f"{project_id}.{dataset_id}.{file_name.split('.')[0]}"
    
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,  # Skip the header row
        autodetect=True,  # Auto-detect schema
    )
    
    with io.StringIO(''.join(modified_file)) as modified_file_io:
        load_job = client.load_table_from_file(modified_file_io, table_id, job_config=job_config)
    
    load_job.result()  # Wait for the job to complete

    print(f"File {file_name} has been uploaded to BigQuery table {table_id}.")


File transArchive_201001_201003.csv has been uploaded to BigQuery table wedge-project-vw.wedge_data.transArchive_201001_201003.
File transArchive_201004_201006.csv has been uploaded to BigQuery table wedge-project-vw.wedge_data.transArchive_201004_201006.
File transArchive_201007_201009.csv has been uploaded to BigQuery table wedge-project-vw.wedge_data.transArchive_201007_201009.


In [5]:
# List of headers you want to add (modify as needed)
new_headers = ["datetime", "register_no", "emp_no", "trans_no", "upc", "description", "trans_type", "trans_subtype", "trans_status", "department", "quantity", "Scale", "cost", "unitPrice", "total", "regPrice", "altPrice", "tax", "taxexempt", "foodstamp", "wicable", "discount", "memDiscount", "discountable", "discounttype", "voided", "percentDiscount", "ItemQtty", "volDiscType", "volume", "VolSpecial", "mixMatch", "matched", "memType", "staff", "numflag", "itemstatus", "tenderstatus", "charflag", "varflag", "batchHeaderID", "local", "organic", "display", "receipt", "card_no", "store", "branch", "match_id", "trans_id"]

# Store the modified files for BigQuery upload
modified_files = []

# Define the delimiters dictionary
delimiters = {}

# Start by reading in all the files again.
for this_zf in zip_files:
    with ZipFile("WedgeZipOfZips/" + this_zf, 'r') as zf:
        zipped_files = zf.namelist()

        for file_name in zipped_files:
            input_file = zf.open(file_name, 'r')
            input_file = io.TextIOWrapper(input_file, encoding="utf-8")

            # Read the first line to detect the delimiter
            first_line = input_file.readline()
            dialect = csv.Sniffer().sniff(sample=first_line, delimiters=[",", ";", "\t"])
            detected_delimiter = dialect.delimiter

            # Check if the detected delimiter is different from ","
            if detected_delimiter != ",":
                # Change the delimiter to ","
                delimiters[file_name] = ","
            else:
                delimiters[file_name] = detected_delimiter

            # Reset the file back to the beginning for further processing
            input_file.seek(0)

            # Read the first line to check for the header row
            first_line = input_file.readline()
            is_header = any(keyword in first_line for keyword in new_headers)

            if not is_header:
                # If header is missing, add the new headers
                modified_file = [",".join(new_headers) + "\n"]
            else:
                modified_file = [first_line]

            # Now, you can process the rest of the file, adding or replacing values as needed
            this_delimiter = delimiters[file_name]
            for line in input_file:
                line = line.strip()
                if r'\\N' in line or r'\N' in line:
                    line = line.replace(r'\\N', 'NULL').replace(r'\N', 'NULL')
                modified_file.append(line)

            input_file.close()  # tidy up

            modified_files.append((file_name, modified_file))

# Now you have a list of modified files to upload to BigQuery
for file_name, modified_file in modified_files:
    # Upload the modified file to Google BigQuery
    table_id = f"{project_id}.{dataset_id}.{file_name.split('.')[0]}"
    
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=0,  # No need to skip the header row
        autodetect=True,  # Auto-detect schema
    )
    
    with io.StringIO(''.join(modified_file)) as modified_file_io:
        load_job = client.load_table_from_file(modified_file_io, table_id, job_config=job_config)
    
    load_job.result()  # Wait for the job to complete

    print(f"File {file_name} has been uploaded to BigQuery table {table_id}.")


BadRequest: 400 Number of columns 65535 is too many for table 'transArchive_201001_201003_ed8d6c91_b462_4571_8c75_698e041c4105_source'. A table must have no more than 10000 columns.