In [1]:
import os
FOLDER_NAME = "LCAProgramsH1BH1B1E3"
# YEAR_MONTH = datetime.today().strftime("%Y-%m")
YEAR_MONTH = "2019-10"
# Assuming all data is inside us-visa-data folder
DOWNLOAD_DIR = os.getenv('DOWNLOAD_DIR', '../us-visa-data')
PARQUET_DIR = DOWNLOAD_DIR + "/" + FOLDER_NAME + "/" + YEAR_MONTH + "/downloads/parquet"
PARQUET_MASTER_SCHEMA = PARQUET_DIR + "/master_schema"

MASTER_SCHEMA = ['YEAR_CASE_BELONGS_TO','CASE_NUMBER','CASE_STATUS','CASE_SUBMITTED','DECISION_DATE','VISA_CLASS','EMPLOYMENT_START_DATE','EMPLOYMENT_END_DATE','EMPLOYER_NAME','EMPLOYER_ADDRESS','EMPLOYER_CITY','EMPLOYER_STATE','EMPLOYER_POSTAL_CODE','EMPLOYER_COUNTRY','EMPLOYER_PROVINCE','EMPLOYER_PHONE','EMPLOYER_PHONE_EXT','AGENT_REPRESENTING_EMPLOYER','AGENT_ATTORNEY_NAME','AGENT_ATTORNEY_CITY','AGENT_ATTORNEY_STATE','JOB_TITLE','SOC_CODE','SOC_NAME','NAICS_CODE','TOTAL_WORKERS','FULL_TIME_POSITION','PREVAILING_WAGE','PW_UNIT_OF_PAY','WAGE_RATE_OF_PAY_FROM','WAGE_RATE_OF_PAY_TO','WAGE_UNIT_OF_PAY_FROM','WAGE_UNIT_OF_PAY_TO','H1B_DEPENDENT','WILLFUL_VIOLATOR','WORKSITE_CITY','WORKSITE_COUNTY','WORKSITE_STATE','WORKSITE_POSTAL_CODE']

In [None]:
import pandas as pd
import os
files = [f for f in os.listdir(PARQUET_MASTER_SCHEMA) if not f.startswith('.')]
dataframes = []
for file in files:
    df = pd.read_parquet(os.path.join(PARQUET_MASTER_SCHEMA,file))
    dataframes.append(df)
    del df
df = pd.concat(dataframes)

In [None]:
df.isnull().sum()

# PREVAILING_WAGE column

In [None]:
# There were . in the PREVAILING_WAGE column data, replace them
# We dint use replace because we need floating values to remain, so replace only exact matches
df.loc[df['PREVAILING_WAGE'] == ".", 'PREVAILING_WAGE'] = '0'

In [None]:
# Some elements have 20-70, in which case average the value and replace
import re
import statistics
import pandas as pd
def average_if_hyphen(x):
    if pd.isnull(x):
        return x
    else:
        y = re.search('-',str(x))
        if y: # if found 
            return statistics.mean([int(l) for l in str(x).split("-")])
        else:
            return x

df['PREVAILING_WAGE'] = df['PREVAILING_WAGE'].apply(average_if_hyphen,convert_dtype=False)

In [None]:
df.iloc[5913308, 27]

In [None]:
# Some elements have $ string, remove that
import re
def replace_dollar(x):
    if "$" in str(x):
        return str(x).replace('$','')
    else:
        return x

df['PREVAILING_WAGE'] = df['PREVAILING_WAGE'].apply(replace_dollar,convert_dtype=False)

In [None]:
df['PREVAILING_WAGE'] = pd.to_numeric(df['PREVAILING_WAGE'])

In [None]:
pd.set_option('display.float_format', lambda x: '%.2f' % x)
pd.to_numeric(df['PREVAILING_WAGE']).describe()

In [None]:
df.info()

In [None]:
df['CASE_SUBMITTED'] = pd.to_datetime(df['CASE_SUBMITTED'],errors="coerce")

# DECISION_DATE column

In [None]:
# ValueError: ('Unknown string format:', '6/1/2006 2')
import re
def remove_string_after_date(x):
    if pd.isnull(x):
        return x
    else:
        y = re.search(r"[\d]{1,2}/[\d]{1,2}/[\d]{4}\s\d", str(x))
        if y:
            return str(x).split(" ")[0]
        else:
            return x

df['DECISION_DATE'] = df['DECISION_DATE'].apply(remove_string_after_date,convert_dtype=False)

In [None]:
df['DECISION_DATE'] = pd.to_datetime(df['DECISION_DATE'])

# CASE_STATUS column

In [None]:
df['CASE_STATUS'].unique()

In [None]:
match_values = ['Certified','Denied','Pending','PENDING QUALITY AND COMPLIANCE REVIEW - UNASSIGNED',
                'Hold','Debarred','roye@fragomen.com','omboko@jacksonlewis.com','aespiritusanto@fragomen.c','mkwok@mltsf.com']
replace_with_values = ['CERTIFIED','DENIED','PENDING','PENDING','HOLD','DEBARRED','','','','']
df['CASE_STATUS'].replace(to_replace=match_values,value=replace_with_values,regex=False,inplace=True)

In [None]:
df['CASE_STATUS'].unique()

In [None]:
df['CASE_STATUS'] = df['CASE_STATUS'].astype('category')

# VISA_CLASS column

In [None]:
df['VISA_CLASS'].unique()

In [None]:
# See H-1B_Record_Layout_FY08.doc
import numpy as np
match_values = ['R','A','C','S','Select Visa Classification']
replace_with_values = ['H-1B','E-3 Australian','H-1B1 Chile','H-1B1 Singapore',np.nan]
df['VISA_CLASS'].replace(to_replace=match_values,value=replace_with_values,regex=False,inplace=True)

In [None]:
df['VISA_CLASS'].unique()

In [None]:
df['VISA_CLASS'] = df['VISA_CLASS'].astype('category')

In [None]:
df.info()

# EMPLOYMENT_START_DATE column

In [None]:
# OutOfBoundsDatetime: Out of bounds nanosecond timestamp: 2901-02-04 00:00:00
def replace_wrong_year(x):
    if pd.isnull(x):
        return x
    else:
        if "2901" in str(x):
            return str(x).replace("2901","2019")
        else:
            return x

df['EMPLOYMENT_START_DATE'] = df['EMPLOYMENT_START_DATE'].apply(replace_wrong_year,convert_dtype=False)

In [None]:
# ValueError : 70/1, df[df['EMPLOYMENT_START_DATE'].astype(str).str.contains('70')]
import re
def replace_date_70(x):
    if pd.isnull(x):
        return x
    else:
        y = re.search(r"70/[\d]{1,2}/[\d]{4}",str(x))
        if y:
            return str(x).replace("70","07")
        else:
            return x

df['EMPLOYMENT_START_DATE'] = df['EMPLOYMENT_START_DATE'].apply(replace_date_70,convert_dtype=False)

In [None]:
# df[df['EMPLOYMENT_START_DATE'].astype(str).str.contains('70')] - Observe that there are 7001 years.
# If you observe the YEAR_CASE_BELONGS_TO, it is clear typo to type 2001
import re
def replace_date_7001(x):
    if pd.isnull(x):
        return x
    else:
        if "7001" in str(x):
            return str(x).replace("7001","2001")
        else:
            return x

df['EMPLOYMENT_START_DATE'] = df['EMPLOYMENT_START_DATE'].apply(replace_date_7001,convert_dtype=False)

In [None]:
# Date contained '05/19/1270' - replace with 2001, which is YEAR_CASE_BELONGS_TO
df.loc[df['EMPLOYMENT_START_DATE'] == "05/19/1270", 'EMPLOYMENT_START_DATE'] = '05/19/2001'

In [None]:
# Date contained '03/70/2001' - replace with 2001, which is YEAR_CASE_BELONGS_TO
df.loc[df['EMPLOYMENT_START_DATE'] == "03/70/2001", 'EMPLOYMENT_START_DATE'] = '03/07/2001'

In [None]:
# Date contained '??/?7/770?' - replace with nan
import numpy as np
df.loc[df['EMPLOYMENT_START_DATE'] == "??/?7/770?", 'EMPLOYMENT_START_DATE'] = np.nan

In [None]:
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html
# coerce is used as I got tired of fixing data quality issues, for now replacing with NaT (not a time)
df['EMPLOYMENT_START_DATE'] = pd.to_datetime(df['EMPLOYMENT_START_DATE'],errors="coerce")

# EMPLOYMENT_END_DATE column

In [None]:
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html
# coerce is used as I got tired of fixing data quality issues, for now replacing with NaT (not a time)
df['EMPLOYMENT_END_DATE'] = pd.to_datetime(df['EMPLOYMENT_END_DATE'],errors="coerce")

In [None]:
df.info()

# SOC_CODE column

In [None]:
df['SOC_CODE'].unique().size

In [None]:
df['SOC_CODE'] = pd.to_numeric(df['SOC_CODE'],errors="coerce")

# NAICS_CODE column

In [None]:
df['NAICS_CODE'].unique().size

# TOTAL_WORKERS column

In [None]:
df['TOTAL_WORKERS'].isnull().size

In [None]:
df['TOTAL_WORKERS'] = pd.to_numeric(df['TOTAL_WORKERS'])

# PW_UNIT_OF_PAY column

In [None]:
# df['PW_UNIT_OF_PAY'].unique()
df['PW_UNIT_OF_PAY'] = df['PW_UNIT_OF_PAY'].astype('category')

# WAGE_RATE_OF_PAY_FROM column

In [None]:
# There were . in the WAGE_RATE_OF_PAY_FROM column data, replace them
# We dint use replace because we need floating values to remain, so replace only exact matches
df.loc[df['WAGE_RATE_OF_PAY_FROM'] == ".", 'WAGE_RATE_OF_PAY_FROM'] = '0'

In [None]:
# ValueError: could not convert string to float: '3800\t.00'
# ValueError: could not convert string to float: '42~000.00'
df.loc[df['WAGE_RATE_OF_PAY_FROM'] == "3800\t.00", 'WAGE_RATE_OF_PAY_FROM'] = '3800.00'
df.loc[df['WAGE_RATE_OF_PAY_FROM'] == "42~000.00", 'WAGE_RATE_OF_PAY_FROM'] = '42'

In [None]:
# ValueError: could not convert string to float: '1110???.??' . Got tired at this point, used coerce
df['WAGE_RATE_OF_PAY_FROM'] = pd.to_numeric(df['WAGE_RATE_OF_PAY_FROM'],errors="coerce")

# WAGE_RATE_OF_PAY_TO column

In [None]:
df['WAGE_RATE_OF_PAY_TO'].unique()

In [None]:
df['WAGE_RATE_OF_PAY_TO'] = pd.to_numeric(df['WAGE_RATE_OF_PAY_TO'],errors="coerce")

# WAGE_UNIT_OF_PAY_FROM column

In [None]:
# https://flcdatacenter.com/CaseH1B.aspx
import numpy as np
match_values = ['Year','Hour','Month','Week','2 weeks','Select Pay Range']
replace_with_values = ['Y','H','M','W','B',np.nan]
df['WAGE_UNIT_OF_PAY_FROM'].replace(to_replace=match_values,value=replace_with_values,regex=False,inplace=True)

In [None]:
# https://flcdatacenter.com/CaseH1B.aspx
import numpy as np
match_values = ['YR','HR','MTH','WK','BI','Bi-Weekly','82,817.00','75,045.00','138,000.00','54,000.00','64,000.00']
replace_with_values = ['Y','H','M','W','B','B','Y','Y','Y','Y','Y']
df['WAGE_UNIT_OF_PAY_FROM'].replace(to_replace=match_values,value=replace_with_values,regex=False,inplace=True)

In [None]:
# https://flcdatacenter.com/CaseH1B.aspx
# How did we know YM = Y ? well print a row that contains YM, observe and make a judgement call
import numpy as np
match_values = ['YM','MH','YBWH','YH','y','YMBWH','YMBW','WH','yr','hr','mth','wk','bi',None]
replace_with_values = ['Y','H','H','Y','Y','Y','H','H','Y','H','M','W','B',np.nan]
df['WAGE_UNIT_OF_PAY_FROM'].replace(to_replace=match_values,value=replace_with_values,regex=False,inplace=True)

In [None]:
df['WAGE_UNIT_OF_PAY_FROM'] = df['WAGE_UNIT_OF_PAY_FROM'].astype('category')

# WAGE_UNIT_OF_PAY_TO column

In [None]:
df['WAGE_UNIT_OF_PAY_TO'].unique()

In [None]:
# https://flcdatacenter.com/CaseH1B.aspx
import numpy as np
match_values = ['Year','Hour','Month','Week','2 weeks','Select Pay Range']
replace_with_values = ['Y','H','M','W','B',np.nan]
df['WAGE_UNIT_OF_PAY_TO'].replace(to_replace=match_values,value=replace_with_values,regex=False,inplace=True)

In [None]:
# https://flcdatacenter.com/CaseH1B.aspx
import numpy as np
match_values = ['YR','HR','MTH','WK','BI','Bi-Weekly','82,817.00','75,045.00','138,000.00','54,000.00','64,000.00']
replace_with_values = ['Y','H','M','W','B','B','Y','Y','Y','Y','Y']
df['WAGE_UNIT_OF_PAY_TO'].replace(to_replace=match_values,value=replace_with_values,regex=False,inplace=True)

In [None]:
# https://flcdatacenter.com/CaseH1B.aspx
# How did we know YM = Y ? well print a row that contains YM, observe and make a judgement call
import numpy as np
match_values = ['YM','MH','YBWH','YH','y','YMBWH','YMBW','WH','yr','hr','mth','wk','bi',None]
replace_with_values = ['Y','H','H','Y','Y','Y','H','H','Y','H','M','W','B',np.nan]
df['WAGE_UNIT_OF_PAY_TO'].replace(to_replace=match_values,value=replace_with_values,regex=False,inplace=True)

In [None]:
df['WAGE_UNIT_OF_PAY_TO'] = df['WAGE_UNIT_OF_PAY_TO'].astype('category')

# H1B_DEPENDENT column

In [None]:
df['H1B_DEPENDENT'].unique()

# WORKSITE_POSTAL_CODE column

In [None]:
df['WORKSITE_POSTAL_CODE'] = pd.to_numeric(df['WORKSITE_POSTAL_CODE'],errors="coerce")

# EMPLOYER_POSTAL_CODE column

In [None]:
df['EMPLOYER_POSTAL_CODE'] = pd.to_numeric(df['EMPLOYER_POSTAL_CODE'],errors="coerce")

# Convert to parquet and save back to disk

In [None]:
df.to_parquet(os.path.join(PARQUET_MASTER_SCHEMA, "2001-2019" 
                           + ".snappy.parquet"), compression='snappy', engine="pyarrow",index=False) # index=False so that row indexes are not saved

# Convert all salaries to normalized

In [None]:
import os
FOLDER_NAME = "LCAProgramsH1BH1B1E3"
# YEAR_MONTH = datetime.today().strftime("%Y-%m")
YEAR_MONTH = "2019-10"
# Assuming all data is inside us-visa-data folder
DOWNLOAD_DIR = os.getenv('DOWNLOAD_DIR', '../us-visa-data')
PARQUET_DIR = DOWNLOAD_DIR + "/" + FOLDER_NAME + "/" + YEAR_MONTH + "/downloads/parquet"
PARQUET_MASTER_SCHEMA = PARQUET_DIR + "/master_schema"

MASTER_SCHEMA = ['YEAR_CASE_BELONGS_TO','CASE_NUMBER','CASE_STATUS','CASE_SUBMITTED','DECISION_DATE','VISA_CLASS','EMPLOYMENT_START_DATE','EMPLOYMENT_END_DATE','EMPLOYER_NAME','EMPLOYER_ADDRESS','EMPLOYER_CITY','EMPLOYER_STATE','EMPLOYER_POSTAL_CODE','EMPLOYER_COUNTRY','EMPLOYER_PROVINCE','EMPLOYER_PHONE','EMPLOYER_PHONE_EXT','AGENT_REPRESENTING_EMPLOYER','AGENT_ATTORNEY_NAME','AGENT_ATTORNEY_CITY','AGENT_ATTORNEY_STATE','JOB_TITLE','SOC_CODE','SOC_NAME','NAICS_CODE','TOTAL_WORKERS','FULL_TIME_POSITION','PREVAILING_WAGE','PW_UNIT_OF_PAY','WAGE_RATE_OF_PAY_FROM','WAGE_RATE_OF_PAY_TO','WAGE_UNIT_OF_PAY_FROM','WAGE_UNIT_OF_PAY_TO','H1B_DEPENDENT','WILLFUL_VIOLATOR','WORKSITE_CITY','WORKSITE_COUNTY','WORKSITE_STATE','WORKSITE_POSTAL_CODE']

In [None]:
import pandas as pd
import os
df = pd.read_parquet(os.path.join(PARQUET_MASTER_SCHEMA,"2001-2019.snappy.parquet"))

In [None]:
# This cell execution takes more than 15 min
# df[(df['YEAR_CASE_BELONGS_TO']==2010)].nlargest(20,['NORMALIZED_SALARY'], keep="all").filter(items=['YEAR_CASE_BELONGS_TO','EMPLOYER_NAME','EMPLOYER_CITY','EMPLOYER_STATE','PW_UNIT_OF_PAY','PREVAILING_WAGE','NORMALIZED_SALARY'])
def normalized_salary(row):
    if pd.isnull(row['PW_UNIT_OF_PAY']):
        return row['PREVAILING_WAGE']
    if row['PW_UNIT_OF_PAY'] == 'Year' :
        return row['PREVAILING_WAGE']
    if row['PW_UNIT_OF_PAY'] == 'Hour':
        yearly = row['PREVAILING_WAGE']*8*5*4*12
        if yearly < 600000: # making an assumption that no h1b gets paid more than 60000. Also 2010 data has wrongly typed unit of pay...BAD DATA
            return yearly
        else:
            return row['PREVAILING_WAGE'] # assuming that PW_UNIT_OF_PAY has wrong value
    if row['PW_UNIT_OF_PAY'] == 'Week' :
        return row['PREVAILING_WAGE']*52
    if row['PW_UNIT_OF_PAY'] == 'Bi-Weekly':
        return row['PREVAILING_WAGE']*24
    if row['PW_UNIT_OF_PAY'] == 'Month':
        return row['PREVAILING_WAGE']*12    

df['NORMALIZED_SALARY'] = df.apply(lambda row: normalized_salary(row), axis=1)

In [None]:
df.to_parquet(os.path.join(PARQUET_MASTER_SCHEMA, "2001-2019_normalized_salary" 
                           + ".snappy.parquet"), compression='snappy', engine="pyarrow",index=False) # index=False so that row indexes are not saved

# Derive State from WORKSITE_POSTAL_CODE

In [None]:
! pip install uszipcode==0.2.4

In [None]:
from uszipcode import SearchEngine
search = SearchEngine(simple_zipcode=True)

def return_state(myzip):
    if pd.isnull(myzip):
        return myzip
    else:
        zipcode = search.by_zipcode(str(int(myzip)))
        zipcode_dict = zipcode.to_dict()
        state = zipcode_dict['state']
        print("State for {0} is {1}".format(myzip,state))
        return state
    
df['POSTAL_TO_STATE'] = df['WORKSITE_POSTAL_CODE'].apply(return_state,convert_dtype=False)

In [None]:
df.to_parquet(os.path.join(PARQUET_MASTER_SCHEMA, "2001-2019_normalized_salary_postal_to_state" 
                           + ".snappy.parquet"), compression='snappy', engine="pyarrow",index=False) # index=False so that row indexes are not saved

# Derive County (and its FIPS) from WORKSITE_POSTAL_CODE

In [None]:
df_2019_cali = df[(df['POSTAL_TO_STATE']=='CA')&(df['YEAR_CASE_BELONGS_TO']==2019)]

In [None]:
df_fips = pd.read_csv('fips_county_state.csv',dtype={'FIPS':str,'Name':str,'State':str})
df_fips

In [None]:
from uszipcode import SearchEngine
search = SearchEngine(simple_zipcode=False)

def return_county(myzip):
    if pd.isnull(myzip):
        return myzip
    else:
        zipcode = search.by_zipcode(str(int(myzip)))
        zipcode_dict = zipcode.to_dict()
        county = zipcode_dict['county']
        print("County for {0} is {1}".format(myzip,county))
        return county
    
df_2019_cali['POSTAL_TO_COUNTY'] = df_2019_cali['WORKSITE_POSTAL_CODE'].apply(return_county,convert_dtype=False)

In [None]:
# Save to disk, because the above cell operation takes more than 30 min to execute
df_2019_cali.to_parquet(os.path.join(PARQUET_MASTER_SCHEMA, "2019_CALI_WORKSITE_POSTAL_TO_COUNTY" 
                           + ".snappy.parquet"), compression='snappy', engine="pyarrow",index=False) # index=False so that row indexes are not saved

In [None]:
import os
FOLDER_NAME = "LCAProgramsH1BH1B1E3"
# YEAR_MONTH = datetime.today().strftime("%Y-%m")
YEAR_MONTH = "2019-10"
# Assuming all data is inside us-visa-data folder
DOWNLOAD_DIR = os.getenv('DOWNLOAD_DIR', '../../us-visa-data')
PARQUET_DIR = DOWNLOAD_DIR + "/" + FOLDER_NAME + "/" + YEAR_MONTH + "/downloads/parquet"
PARQUET_MASTER_SCHEMA = PARQUET_DIR + "/master_schema"

import pandas as pd
df_2019_cali = pd.read_parquet(os.path.join(PARQUET_MASTER_SCHEMA,"2019_CALI_WORKSITE_POSTAL_TO_COUNTY.snappy.parquet"))

In [None]:
def return_fips(county):
    if pd.isnull(county):
        return county
    else:
        fips = df_fips[df_fips['Name'] == county.replace('County','').strip()]['FIPS'].values[0]
        print("Fips for {0} is {1}".format(county,fips))
        return fips
    
df_2019_cali['FIPS'] = df_2019_cali['POSTAL_TO_COUNTY'].apply(return_fips,convert_dtype=False).astype('str')

In [None]:
# Save to disk, because the above cell operation takes more than 30 min to execute
df_2019_cali.to_parquet(os.path.join(PARQUET_MASTER_SCHEMA, "2019_CALI_WORKSITE_POSTAL_TO_COUNTY_FIPS" 
                           + ".snappy.parquet"), compression='snappy', engine="pyarrow",index=False) # index=False so that row indexes are not saved