## Clean up the data 

# With Pyspark 

In [1]:
import spark_setup
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import functions as F

sc = pyspark.SparkContext()
sql = SQLContext(sc)

path = 'us_perm_visas.csv' #larger dataset 
Spath = 'us_perm_visas_small.csv' #smaller dataset 
filename = Spath#"file:///home/s1960822/visas_clean.csv"

In [2]:
good_cols = ["class_of_admission",
             "case_status",
             "country_of_citzenship",
             "decision_date",
             "employer_city",
#              "employer_country" ,
             "employer_decl_info_title" ,
             "employer_name",
             "employer_num_employees",
             "employer_state",
#              "foreign_worker_info_alt_edu_experience",
             "foreign_worker_info_birth_country",
             "foreign_worker_info_city",
             "foreign_worker_info_education",
#              "foreign_worker_info_education_other",
             "foreign_worker_info_inst",
             "foreign_worker_info_major",
#              "foreign_worker_info_req_experience",
             "foreign_worker_info_state",
#              "fw_info_yr_rel_edu_completed" ,
             "job_info_work_city",
             "job_info_experience",
             "job_info_experience_num_months",
#              "job_info_title",
             "job_info_work_state",
             "naics_2007_us_title", 
             "naics_title",
             "wage_offer_from_9089",
             "wage_offer_unit_of_pay_9089",
             "pw_job_title_9089",
             "pw_level_9089", 
             ]

In [3]:
#create DF
df = (sql.read.format("com.databricks.spark.csv").option("header", "true").load(filename))

# reduce to cols of interest 
df1 = df.select([col for col in good_cols])

#Filter out Withdrawn 
df2 = df1.filter(df1.case_status != 'Withdrawn')

# Turn Expired into Certified 
df3 = df2.withColumn('case_status',
                     F.when(df2['case_status']=='Certified-Expired', "Certified").\
                    otherwise(df2['case_status']))

#bin employer size 
df4 = df3.withColumn("L", F.when(df3['employer_num_employees'] < 1e7, 'large' ))

df5 = df4.withColumn("M", F.when(df4['employer_num_employees'] < 200, 'medium').otherwise(df4['L']))

df6 = df5.withColumn("bin_employer_num_employees",
                        F.when(df5['employer_num_employees'] < 50, 'small').otherwise(df5['M']))

df7 = df6.select([col for col in good_cols])

good_cols.append('bin_employer_num_employees')
#group job title into single word vec 
df8 = df7.withColumn('title', F.array('pw_job_title_9089',
                                "naics_title",
                                "naics_2007_us_title")[0])

In [4]:
#check the data 
import pandas as pd 

SPARK = pd.DataFrame(df8.collect())
SPARK.shape

(973, 26)

# With Pandas 

In [5]:
import pandas as pd 
import numpy as np 
import matplotlib.pyplot as plt 
%matplotlib inline

In [6]:
path = 'us_perm_visas.csv' #larger dataset 
Spath = 'us_perm_visas_small.csv' #smaller dataset 
visas = pd.read_csv(Spath, dtype='str')

print (visas.shape)
visas.head(5)

(1014, 154)


Unnamed: 0,add_these_pw_job_title_9089,agent_city,agent_firm_name,agent_state,application_type,case_no,case_number,case_received_date,case_status,class_of_admission,...,ri_pvt_employment_firm_to,ri_us_workers_considered,schd_a_sheepherder,us_economic_sector,wage_offer_from_9089,wage_offer_to_9089,wage_offer_unit_of_pay_9089,wage_offered_from_9089,wage_offered_to_9089,wage_offered_unit_of_pay_9089
0,,,,,PERM,A-07323-97014,,,Certified,J-1,...,,,,IT,75629.0,,yr,,,
1,,,,,PERM,A-07332-99439,,,Denied,B-2,...,,,,Other Economic Sector,37024.0,,yr,,,
2,,,,,PERM,A-07333-99643,,,Certified,H-1B,...,,,,Aerospace,47923.0,,yr,,,
3,,,,,PERM,A-07339-01930,,,Certified,B-2,...,,,,Other Economic Sector,10.97,,hr,,,
4,,,,,PERM,A-07345-03565,,,Certified,L-1,...,,,,Advanced Mfg,100000.0,,yr,,,


In [7]:
good_cols = ["class_of_admission",
             "case_status",
             "country_of_citzenship",
             "decision_date",
             "employer_city",
#              "employer_country" ,
             "employer_decl_info_title" ,
             "employer_name",
             "employer_num_employees",
             "employer_state",
#              "foreign_worker_info_alt_edu_experience",
             "foreign_worker_info_birth_country",
             "foreign_worker_info_city",
             "foreign_worker_info_education",
#              "foreign_worker_info_education_other",
             "foreign_worker_info_inst",
             "foreign_worker_info_major",
#              "foreign_worker_info_req_experience",
             "foreign_worker_info_state",
#              "fw_info_yr_rel_edu_completed" ,
             "job_info_work_city",
             "job_info_experience",
             "job_info_experience_num_months",
#              "job_info_title",
             "job_info_work_state",
             "naics_2007_us_title", 
             "naics_title",
             "wage_offer_from_9089",
             "wage_offer_unit_of_pay_9089",
             "pw_job_title_9089",
             "pw_level_9089"]

In [8]:
'''
drop irrelevant columns 
'''

for col in visas.columns: 
    if col not in good_cols:
        visas = visas.drop(col, axis=1)
# visas.head()

In [9]:
'''
clean/binarize status
''' 

visas = visas[visas.case_status != "Withdrawn"]
visas.case_status = visas.case_status.apply(lambda x: "Certified" if x == "Certified-Expired" else x)
# visas.case_status

In [10]:
'''
bin sponsor exec status 
'''

def isNaN(num):
    return num != num

def isExec(string):
    string = string.lower()
    for e in execs: 
        if e in string: 
            return True
            
execs = ['executive', 'ceo', 'cfo', 'coo', 'chief', 'director', 'president', 'vice', 'vp', 'director', 'senior']

visas.employer_decl_info_title = visas.employer_decl_info_title.apply(lambda x: 'U' if isNaN(x)  else x) #get rid of NANs
visas.employer_decl_info_title = visas.employer_decl_info_title.apply(lambda x: 'E' if isExec(x) else x) 
visas.employer_decl_info_title = visas.employer_decl_info_title.apply(lambda x: 'N' if len(x) > 1 else x) 


In [11]:
'''
bin employee size
'''
def checkSize(val, thresh):
    try: 
        if int(val) < thresh and int(val) > 0: 
            return True
    except: 
        pass
     
visas['bin_employer_num_employees'] = visas['employer_num_employees']
visas.employer_num_employees = visas.employer_num_employees.apply(lambda x: -1 if isNaN(x) else x)
visas.employer_num_employees = visas.employer_num_employees.apply(lambda x: int(x))

visas.bin_employer_num_employees = visas.bin_employer_num_employees.apply(lambda x: 'small' if checkSize(x, 50) else x)
visas.bin_employer_num_employees = visas.bin_employer_num_employees.apply(lambda x: 'medium' if checkSize(x, 200) else x)
visas.bin_employer_num_employees = visas.bin_employer_num_employees.apply(lambda x: 'large' if checkSize(x, 7e10) else x)

In [12]:
'''
clean job experience months 
'''
def checkExperience(x):
    if x['job_info_experience'] == 'N':
        x['job_info_experience_num_months'] = 0
    return x

visas = visas.apply(checkExperience, axis=1)

In [13]:
'''
clean job title 
'''
def checkTitle(x):
    if isNaN(x["naics_title"]) and isNaN(x["naics_2007_us_title"] and isNaN(x['pw_job_title_9089'])):
        x['title'] = 'None'
    else: 
        x['title'] = '{} {} {}'.format(x['pw_job_title_9089'], x["naics_title"], x["naics_2007_us_title"]).lower()
    return x

visas['title'] = visas["naics_2007_us_title"].copy
visas = visas.apply(checkTitle, axis=1)

In [14]:
visas.head(2)

Unnamed: 0,case_status,class_of_admission,country_of_citzenship,decision_date,employer_city,employer_decl_info_title,employer_name,employer_num_employees,employer_state,foreign_worker_info_birth_country,...,job_info_work_city,job_info_work_state,naics_2007_us_title,naics_title,pw_job_title_9089,pw_level_9089,wage_offer_from_9089,wage_offer_unit_of_pay_9089,bin_employer_num_employees,title
0,Certified,J-1,ARMENIA,2/1/2012,NEW YORK,U,NETSOFT USA INC.,-1,NY,,...,New York,NY,Computer Systems Design Services,,"Computer Software Engineers, Applications",Level II,75629,yr,,"computer software engineers, applications nan ..."
1,Denied,B-2,POLAND,12/21/2011,CARLSTADT,U,PINNACLE ENVIRONEMNTAL CORP,-1,NY,,...,New York,NY,Hazardous Waste Treatment and Disposal,,ASBESTOS HANDLER,Level I,37024,yr,,asbestos handler nan hazardous waste treatment...


In [15]:
# visas.to_csv('visas_clean.csv')
# visas = pd.read_csv('visas_clean.csv', encoding = "ISO-8859-1")