<a href="https://colab.research.google.com/github/j-buss/wi-dpi-analysis/blob/development/eda/2.0_Landing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Salary and Education in Wisconsin - 2.0 Load Landing BigQuery

This notebook is intended to describe analysis on salaries of teachers within the Wisconsin Department of Public Instruction.

## Introduction

### Load libraries
Install the following packages in order to load data to BigQuery.

*Please note this will require a restart to the runtime*

In [0]:
!pip install --upgrade google-cloud-bigquery
!pip install gcsfs
!pip install pandas-gbq -U

### Authenticate to Google Cloud

In [0]:
from google.colab import auth
auth.authenticate_user()

### Import Libraries

In [0]:
import numpy as np
import pandas as pd
pd.set_option('display.max_columns', 50)
pd.set_option('display.max_rows', 5)
import seaborn as sns
import matplotlib.pyplot as plt


In [0]:
%matplotlib inline
plt.style.use('bmh')

In [0]:
from google.cloud import bigquery
from google.cloud import storage
from io import StringIO
import re
import collections

### Functions

In [0]:
def download_file(url, filename):
  r = requests.get(url)
  f = open(filename,'wb')
  f.write(r.content)
  f.close()

In [0]:
def create_dataset(client, project_id, dataset_name):
  
  
  dataset_id = "{}.{}".format(project_id, dataset_name)
  dataset = bigquery.Dataset(dataset_id)
  dataset.location = "US"

  dataset = client.create_dataset(dataset)
  #print("Created dataset {}.{}".format(client.project, dataset.dataset_id))

In [0]:
def delete_dataset(client, project_id, dataset_name):
  
  
  dataset_id = "{}.{}".format(project_id, dataset_name)
  client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)

In [0]:
def return_blob_list(project_id, bucket_name):
    """Lists all the blobs in the bucket."""
    storage_client = storage.Client(project=project_id)
    bucket = storage_client.get_bucket(bucket_name)

    blobs = bucket.list_blobs()
    return blobs

In [0]:
def clean_column_headers(columns):
  return columns.str.strip().str.lower().str.replace(' ', '_').str.replace('(', '').str.replace(')', '').str.replace('.','_').str.replace('/','_').str.replace("'","")

### Define Values

In [0]:
project_id='wi-dpi-010'
raw_data_bucket_name='landing-009'

landing_dataset_name='landing'
refined_dataset_name='refined'

## Data Preparation

### Create Dataset

In [0]:
landing_dataset_name = 'landing'
bq_client = bigquery.Client(project=project_id)

In [0]:
create_dataset(bq_client, project_id, landing_dataset_name)

In [0]:
###DELETE DATASET
#delete_dataset(bq_client, project_id, landing_dataset_name)

### Create Dataframes and Tables from GCS Blobs

For all files (blobs) in the bucket we will split the blob name into the following components:


1.   Source
2.   Year
3.   File
4.   File Type

Additionally, we will use a NamedTuple to define the components and ensure they are callable.

In [0]:
blob_to_process = collections.namedtuple('blob_to_process','source year file file_type fullname')

In [0]:
list_of_blobs = []
for blob in return_blob_list('wi-dpi-010','landing-009'):
  temp = re.findall(r"[\w']+",blob.name)
  temp.append(blob.name)
  if len(temp) == 5:
    list_of_blobs.append(blob_to_process(*temp))

In [14]:
print (list_of_blobs)

[blob_to_process(source='all_staff_report', year='1995', file='1995_all_staff_report', file_type='fwf', fullname='all_staff_report/1995/1995_all_staff_report.fwf'), blob_to_process(source='all_staff_report', year='1995', file='1995_all_staff_report', file_type='metadata', fullname='all_staff_report/1995/1995_all_staff_report.metadata'), blob_to_process(source='all_staff_report', year='1995', file='1995_degree', file_type='csv', fullname='all_staff_report/1995/1995_degree.csv'), blob_to_process(source='all_staff_report', year='1995', file='1995_grade_code', file_type='csv', fullname='all_staff_report/1995/1995_grade_code.csv'), blob_to_process(source='all_staff_report', year='1995', file='1995_race', file_type='csv', fullname='all_staff_report/1995/1995_race.csv'), blob_to_process(source='all_staff_report', year='1995', file='1995_salary_fund_source', file_type='csv', fullname='all_staff_report/1995/1995_salary_fund_source.csv'), blob_to_process(source='all_staff_report', year='1995', f

Now the object 
`list_of_blobs`
has all the blob object information. We will cycle through it to create the tables as needed in BigQuery.

In [0]:
storage_client = storage.Client(project=project_id)
bucket = storage_client.get_bucket(raw_data_bucket_name)

In [0]:
for blob in list_of_blobs:
  if blob.file_type == 'csv':
    data_blob = bucket.get_blob(blob.fullname)
    data = data_blob.download_as_string()
    df = pd.read_csv(StringIO(data.decode('utf-8')),low_memory=False)
    df.columns = clean_column_headers(df.columns)
    print (landing_dataset_name + '.' + blob.file)
    df.to_gbq(landing_dataset_name + '.' + blob.file,project_id=project_id,if_exists='replace')
  elif blob.file_type == 'fwf':
    data_blob = bucket.get_blob(blob.fullname)
    data = data_blob.download_as_string()
    metadata_file = [x_blob.source + '/' + x_blob.year + '/' + x_blob.file + '.' + x_blob.file_type for x_blob in list_of_blobs\
           if (x_blob.file == blob.file and x_blob.file_type != blob.file_type)][0]
    metadata_blob = bucket.get_blob(metadata_file)
    metadata = metadata_blob.download_as_string()
    metadata_df = pd.read_csv(StringIO(metadata.decode('utf-8')))
    col_widths = metadata_df['length'].apply(int)
    col_names = metadata_df['name']
    df = pd.read_fwf(StringIO(data.decode('utf-8')), widths=col_widths, names=col_names)
    df.columns = clean_column_headers(df.columns)
    print (landing_dataset_name + '.' + blob.file)
    df.to_gbq(landing_dataset_name + '.' + blob.file,project_id=project_id,if_exists='replace')
    

  return _read(filepath_or_buffer, kwds)


landing.1995_all_staff_report


1it [00:06,  6.28s/it]


landing.1995_degree


1it [00:02,  2.45s/it]


landing.1995_grade_code


1it [00:02,  2.88s/it]


landing.1995_race


1it [00:05,  5.10s/it]


landing.1995_salary_fund_source


1it [00:03,  3.59s/it]


landing.1995_school_type


1it [00:02,  2.86s/it]


landing.1995_staff_type


1it [00:09,  9.56s/it]


landing.1995_work_agency_type


1it [00:02,  2.65s/it]
  return _read(filepath_or_buffer, kwds)


landing.1996_all_staff_report


1it [00:05,  5.43s/it]


landing.2015_agency_type


1it [00:04,  4.35s/it]


landing.2015_all_staff_report


1it [00:05,  5.18s/it]


landing.2015_assignment_area


1it [00:04,  4.20s/it]


landing.2015_grade_level


1it [00:02,  2.88s/it]


landing.2015_highest_educational_degree


1it [00:02,  2.59s/it]


landing.2015_low_high_grade_level


1it [00:04,  4.92s/it]


landing.2015_position_type


1it [00:05,  5.49s/it]


landing.2015_positions


1it [00:02,  2.98s/it]


landing.2015_race


1it [00:02,  2.62s/it]


landing.2015_staff_category


1it [00:05,  5.51s/it]
