In [11]:
# Download Libraries
!pip3 install google-cloud-storage
!pip3 install pyarrow # Apache Arrow

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [35]:
# Import Libraries
import pandas as pd
import numpy as np
import json
import requests
import zipfile
import io
from io import StringIO
import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import storage
import os
import datetime
from google.cloud import bigquery
import unicodedata


In [8]:
#Gathering Data
#Extracting from datasource to view the head 

URL = 'https://data.cityofnewyork.us/api/views/vx8i-nprf/rows.csv?accessType=DOWNLOAD'

# Define the data type for column 17 because it contains mixed data types including NaN
dtype_mapping = {'Veteran Credit': 'object'}

# Specify NaN values to be considered as missing values
na_values = ['NaN', '', 'NA', 'nan']

df_raw = pd.read_csv(URL, dtype=dtype_mapping, na_values=na_values, low_memory=False)

print(df_raw.info())
print('\n')
print(df_raw.shape)
print('\n')
df_raw.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 491819 entries, 0 to 491818
Data columns (total 20 columns):
 #   Column              Non-Null Count   Dtype  
---  ------              --------------   -----  
 0   Exam No             491819 non-null  int64  
 1   List No             491819 non-null  float64
 2   First Name          491812 non-null  object 
 3   MI                  272572 non-null  object 
 4   Last Name           491811 non-null  object 
 5   Adj. FA             491819 non-null  float64
 6   List Title Code     491819 non-null  int64  
 7   List Title Desc     491819 non-null  object 
 8   Group No            491819 non-null  int64  
 9   List Agency Code    491819 non-null  int64  
 10  List Agency Desc    491819 non-null  object 
 11  List Div Code       0 non-null       float64
 12  Published Date      223254 non-null  object 
 13  Established Date    477619 non-null  object 
 14  Anniversary Date    477619 non-null  object 
 15  Extension Date      227556 non-nul

Unnamed: 0,Exam No,List No,First Name,MI,Last Name,Adj. FA,List Title Code,List Title Desc,Group No,List Agency Code,List Agency Desc,List Div Code,Published Date,Established Date,Anniversary Date,Extension Date,Veteran Credit,Parent Lgy Credit,Sibling Lgy Credit,Residency Credit
0,9618,1214.0,GERMAN,A,SOSA,86.67,91203,BUS OPERATOR,0,0,OPEN COMPETITIVE,,,09/08/2021,09/08/2025,,,,,
1,7001,24935.0,DARREN,L,PAYNE,91.0,70310,FIREFIGHTER,0,0,OPEN COMPETITIVE,,06/13/2018,02/27/2019,02/27/2023,02/27/2025,,,,Residency Credit
2,7001,11653.0,STEPHEN,,MANFRE,96.0,70310,FIREFIGHTER,0,0,OPEN COMPETITIVE,,06/13/2018,02/27/2019,02/27/2023,02/27/2025,,,,Residency Credit
3,2060,15669.0,ELIJAH,T,RICHARDSON,95.71,70112,SANITATION WORKER,0,0,OPEN COMPETITIVE,,07/12/2023,02/28/2024,02/28/2028,,,,,Residency Credit
4,6601,16886.0,JAY,M,AMES,88.75,91207,CONDUCTOR,0,0,OPEN COMPETITIVE,,,02/14/2018,02/14/2022,02/14/2025,,,,


In [8]:
#Storing Data
# Specify the path to your service account key file
service_account_key_file = '/Users/karmayangchentenzin/Downloads/service_acc_key.json'

# Optionally, load other configuration settings from the JSON file
with open(service_account_key_file, 'r') as f:
    config = json.load(f)

# Extracting data from datasource to view the head
URL = 'https://data.cityofnewyork.us/api/views/vx8i-nprf/rows.csv?accessType=DOWNLOAD'

# Define the data type for column 17 because it contains mixed data types including NaN
dtype_mapping = {'Veteran Credit': 'object'}

# Specify NaN values to be considered as missing values
na_values = ['NaN', '', 'NA', 'nan']

df_raw = pd.read_csv(URL, dtype=dtype_mapping, na_values=na_values, low_memory=False)

print(df_raw.info())
print('\n')
print(df_raw.shape)
print('\n')
df_raw.head()

# Fetch data from the web (CSV file)
url = 'https://data.cityofnewyork.us/api/views/vx8i-nprf/rows.csv?accessType=DOWNLOAD'

# Read the contents of the CSV file
response = requests.get(url)

# Check if the request was successful
if response.status_code != 200:
    print("Failed to fetch CSV file.")
    exit()

# Read the CSV data into a DataFrame
try: 
    df = pd.read_csv(io.StringIO(response.text), dtype=dtype_mapping, na_values=na_values, low_memory=False)

    # Add timestamp column
    df['load_date'] = datetime.datetime.now()

    # Replace periods with underscores in column names #This had to be done before the original data was loaded from the source to GC because when creating tables in BigQuery after loading the parquet file to GC, an error message was shown saying that field names can not contain a period (.)
    df.columns = df.columns.str.replace('.', '_')

    # Convert DataFrame to PyArrow Table
    table = pa.Table.from_pandas(df)

    # Write PyArrow Table to Parquet format
    parquet_file_name = 'ny_civil_service_exam.parquet'  # name for the Parquet file
    pq.write_table(table, parquet_file_name)

    # Upload Parquet file to Google Cloud Storage
    bucket_name = 'cis4400_hw1_kyt'
    blob_name = parquet_file_name

    client = storage.Client.from_service_account_json(service_account_key_file)
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob.upload_from_filename(parquet_file_name)

    print(f"Parquet file '{parquet_file_name}' uploaded to {bucket_name}/{blob_name} in Google Cloud Storage.")
except Exception as e:
    print(f"Error occurred: {e}")




<class 'pandas.core.frame.DataFrame'>
RangeIndex: 491819 entries, 0 to 491818
Data columns (total 20 columns):
 #   Column              Non-Null Count   Dtype  
---  ------              --------------   -----  
 0   Exam No             491819 non-null  int64  
 1   List No             491819 non-null  float64
 2   First Name          491812 non-null  object 
 3   MI                  272572 non-null  object 
 4   Last Name           491811 non-null  object 
 5   Adj. FA             491819 non-null  float64
 6   List Title Code     491819 non-null  int64  
 7   List Title Desc     491819 non-null  object 
 8   Group No            491819 non-null  int64  
 9   List Agency Code    491819 non-null  int64  
 10  List Agency Desc    491819 non-null  object 
 11  List Div Code       0 non-null       float64
 12  Published Date      223254 non-null  object 
 13  Established Date    477619 non-null  object 
 14  Anniversary Date    477619 non-null  object 
 15  Extension Date      227556 non-nul

In [14]:
# Display DataFrame with added timestamp column
print(df.head())

# Verify Parquet file contents
parquet_table = pq.read_table(parquet_file_name)
parquet_df = parquet_table.to_pandas()
print(parquet_df.head())

   Exam No  List No First Name   MI   Last Name  Adj_ FA  List Title Code  \
0     9618   1214.0     GERMAN    A        SOSA    86.67            91203   
1     7001  24935.0     DARREN    L       PAYNE    91.00            70310   
2     7001  11653.0    STEPHEN  NaN      MANFRE    96.00            70310   
3     2060  15669.0     ELIJAH    T  RICHARDSON    95.71            70112   
4     6601  16886.0        JAY    M        AMES    88.75            91207   

     List Title Desc  Group No  List Agency Code  ... List Div Code  \
0       BUS OPERATOR         0                 0  ...           NaN   
1        FIREFIGHTER         0                 0  ...           NaN   
2        FIREFIGHTER         0                 0  ...           NaN   
3  SANITATION WORKER         0                 0  ...           NaN   
4          CONDUCTOR         0                 0  ...           NaN   

   Published Date Established Date Anniversary Date Extension Date  \
0             NaN       09/08/2021      

In [22]:


# Initialize a client using the default credentials
client = storage.Client(project="avian-silicon-418821")

# List buckets
buckets = list(client.list_buckets())

# Print bucket names
for bucket in buckets:
    print(bucket.name)


cis4400_hw1_kyt


In [32]:
# Read Parquet file into DataFrame
parquet_file_path = 'gs://cis4400_hw1_kyt/ny_civil_service_exam.parquet'
df = pd.read_parquet(parquet_file_path)

# Print DataFrame contents
print("DataFrame Contents:")
print(df)


DataFrame Contents:
        Exam No  List No   First Name    MI   Last Name  Adj_ FA  \
0          9618   1214.0       GERMAN     A        SOSA    86.67   
1          7001  24935.0       DARREN     L       PAYNE    91.00   
2          7001  11653.0      STEPHEN  None      MANFRE    96.00   
3          2060  15669.0       ELIJAH     T  RICHARDSON    95.71   
4          6601  16886.0          JAY     M        AMES    88.75   
...         ...      ...          ...   ...         ...      ...   
491814      162    640.0        KARIM  None      NUGENT    94.00   
491815      320   3127.0      NICOLAS     C        PUMA    93.61   
491816     2027   4724.0       DILCIA  None       TAPIA    82.44   
491817     2095    291.0  MD ABUBAKAR  None      SIDDIK    70.00   
491818     2545     53.0         LUIS     M       GOMEZ    70.60   

        List Title Code                                 List Title Desc  \
0                 91203                                    BUS OPERATOR   
1            

In [36]:


# Set up BigQuery client
client = bigquery.Client(project="avian-silicon-418821")

# Function to load data into BigQuery table
def load_data_to_bigquery(data_frame, table_id, dataset_id):
    table_ref = client.dataset(dataset_id).table(table_id)
    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    )
    client.load_table_from_dataframe(data_frame, table_ref, job_config=job_config).result()

# Define dataset ID
dataset_id = 'cis4400_hw1'

# Read Parquet file into DataFrame
parquet_file_path = 'gs://cis4400_hw1_kyt/ny_civil_service_exam.parquet'
df = pd.read_parquet(parquet_file_path)



# Normalize text data to remove non-ASCII characters
def normalize_text(text):
    return ''.join(char for char in unicodedata.normalize('NFKD', text) if not unicodedata.combining(char))

# Convert data types from 'object' to 'string' and normalize text
df = df.applymap(lambda x: normalize_text(str(x)))

# Select specific columns for dim_date table
dim_date_df = df[['Established Date', 'Extension Date', 'Published Date', 'Anniversary Date']]

# Load data into the dim_date table in BigQuery
load_data_to_bigquery(dim_date_df, 'dim_date', dataset_id)


# Define the mapping of DataFrame columns to BigQuery table names
table_mappings = {
    #'dim_date': ['Published Date', 'Established Date', 'Anniversary Date', 'Extension Date'],
    #'dim_exam': ['Exam No'],
    #'dim_list': ['List No', 'Group No', 'List Div Code'],
    #'dim_list_title': ['List Title Code', 'List Title Desc'],
    #'dim_list_agency': ['List Agency Code', 'List Agency Desc'],
    'dim_candidate_info': ['First Name', 'MI', 'Last Name'],
    #'dim_residency_info': ['Residency Credit'],
    #'dim_parent_lgy_info': ['Parent Lgy Credit'],
    #'dim_sibling_lgy_credit_info': ['Sibling Lgy Credit'],
    #'dim_veteran_credit_info': ['Veteran Credit']
}

# Load data into corresponding BigQuery tables
for table_id, columns in table_mappings.items():
    data_frame = df[columns]
    load_data_to_bigquery(data_frame, table_id, dataset_id)

print('Data loaded successfully into BigQuery tables.')


  df = df.applymap(lambda x: normalize_text(str(x)))


Data loaded successfully into BigQuery tables.


In [38]:
# Set up BigQuery client
client = bigquery.Client(project="avian-silicon-418821")

# Function to load data into BigQuery table
def load_data_to_bigquery(data_frame, table_id, dataset_id):
    table_ref = client.dataset(dataset_id).table(table_id)
    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    )
    client.load_table_from_dataframe(data_frame, table_ref, job_config=job_config).result()

# Define dataset ID
dataset_id = 'cis4400_hw1'

# Read Parquet file into DataFrame
parquet_file_path = 'gs://cis4400_hw1_kyt/ny_civil_service_exam.parquet'
df = pd.read_parquet(parquet_file_path)

# Select specific columns for dim_candidate_info table
dim_candidate_df = df[['First Name', 'MI', 'Last Name']]

# Load data into the dim_candidate_info table in BigQuery
load_data_to_bigquery(dim_candidate_df, 'dim_candidate_info', dataset_id)

print('Data loaded successfully into the dim_candidate_info table in BigQuery.')


Data loaded successfully into the dim_candidate_info table in BigQuery.
