In [1]:
import pyspark
sc = pyspark.SparkContext()


In [2]:
from pyspark.sql import SQLContext
sql = SQLContext(sc)

In [41]:
df = (sql.read
         .format("com.databricks.spark.csv")
         .option("header", "true")
         .load("h1b_kaggle.csv"))

In [42]:
df.head()

Row(_c0='1', CASE_STATUS='CERTIFIED-WITHDRAWN', EMPLOYER_NAME='UNIVERSITY OF MICHIGAN', SOC_NAME='BIOCHEMISTS AND BIOPHYSICISTS', JOB_TITLE='POSTDOCTORAL RESEARCH FELLOW', FULL_TIME_POSITION='N', PREVAILING_WAGE='36067', YEAR='2016', WORKSITE='ANN ARBOR, MICHIGAN', lon='-83.7430378', lat='42.2808256')

In [43]:
df.take(5)

[Row(_c0='1', CASE_STATUS='CERTIFIED-WITHDRAWN', EMPLOYER_NAME='UNIVERSITY OF MICHIGAN', SOC_NAME='BIOCHEMISTS AND BIOPHYSICISTS', JOB_TITLE='POSTDOCTORAL RESEARCH FELLOW', FULL_TIME_POSITION='N', PREVAILING_WAGE='36067', YEAR='2016', WORKSITE='ANN ARBOR, MICHIGAN', lon='-83.7430378', lat='42.2808256'),
 Row(_c0='2', CASE_STATUS='CERTIFIED-WITHDRAWN', EMPLOYER_NAME='GOODMAN NETWORKS, INC.', SOC_NAME='CHIEF EXECUTIVES', JOB_TITLE='CHIEF OPERATING OFFICER', FULL_TIME_POSITION='Y', PREVAILING_WAGE='242674', YEAR='2016', WORKSITE='PLANO, TEXAS', lon='-96.6988856', lat='33.0198431'),
 Row(_c0='3', CASE_STATUS='CERTIFIED-WITHDRAWN', EMPLOYER_NAME='PORTS AMERICA GROUP, INC.', SOC_NAME='CHIEF EXECUTIVES', JOB_TITLE='CHIEF PROCESS OFFICER', FULL_TIME_POSITION='Y', PREVAILING_WAGE='193066', YEAR='2016', WORKSITE='JERSEY CITY, NEW JERSEY', lon='-74.0776417', lat='40.7281575'),
 Row(_c0='4', CASE_STATUS='CERTIFIED-WITHDRAWN', EMPLOYER_NAME='GATES CORPORATION, A WHOLLY-OWNED SUBSIDIARY OF TOMKINS P

In [4]:
df.show(5)

+---+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+-----------+----------+
|_c0|        CASE_STATUS|       EMPLOYER_NAME|            SOC_NAME|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|            WORKSITE|        lon|       lat|
+---+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+-----------+----------+
|  1|CERTIFIED-WITHDRAWN|UNIVERSITY OF MIC...|BIOCHEMISTS AND B...|POSTDOCTORAL RESE...|                 N|          36067|2016| ANN ARBOR, MICHIGAN|-83.7430378|42.2808256|
|  2|CERTIFIED-WITHDRAWN|GOODMAN NETWORKS,...|    CHIEF EXECUTIVES|CHIEF OPERATING O...|                 Y|         242674|2016|        PLANO, TEXAS|-96.6988856|33.0198431|
|  3|CERTIFIED-WITHDRAWN|PORTS AMERICA GRO...|    CHIEF EXECUTIVES|CHIEF PROCESS OFF...|                 Y|         193066|2016|JERSEY 

In [44]:
df.count()

3002458

In [45]:
#Getting the data types of all columns
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- CASE_STATUS: string (nullable = true)
 |-- EMPLOYER_NAME: string (nullable = true)
 |-- SOC_NAME: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- FULL_TIME_POSITION: string (nullable = true)
 |-- PREVAILING_WAGE: string (nullable = true)
 |-- YEAR: string (nullable = true)
 |-- WORKSITE: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- lat: string (nullable = true)



In [21]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel

In [9]:
#Changing datatypes of some columns
df = df.withColumn("PREVAILING_WAGE", df["PREVAILING_WAGE"].cast("double"))

In [12]:
#Getting a count of nulls in each column
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+-----------+-------------+--------+---------+------------------+---------------+----+--------+---+---+
|_c0|CASE_STATUS|EMPLOYER_NAME|SOC_NAME|JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|WORKSITE|lon|lat|
+---+-----------+-------------+--------+---------+------------------+---------------+----+--------+---+---+
|  0|          0|            0|       0|        0|                 0|              0|   0|       0|  0|  7|
+---+-----------+-------------+--------+---------+------------------+---------------+----+--------+---+---+



In [None]:
#Splitting the WORKSITE column using comma to separate state from rest of the address
from pyspark.sql.functions import split
split_col = split(df['worksite'], ',')
df = df.withColumn('CITY', split_col.getItem(0))
df = df.withColumn('STATE', split_col.getItem(1))

In [21]:
#Replacing nulls with specific values - DONT THINK THIS WORKED so used sql below
#df.na.fill({'case_status': 'Unknown', 'employer_name': 'Unknown', 'soc_name':'Unknown', 'job_title':'Unknown', \
#'full_time_position':'Unknown','prevailing_wage':0,'year':'Unknown','worksite':'Unknown','lon':0,'lat':0})

DataFrame[_c0: string, CASE_STATUS: string, EMPLOYER_NAME: string, SOC_NAME: string, JOB_TITLE: string, FULL_TIME_POSITION: string, PREVAILING_WAGE: string, YEAR: string, WORKSITE: string, lon: string, lat: string]

In [18]:
#df.filter(df["case_status"] = 'Unknown').count()
sql.registerDataFrameAsTable(df, "h1_b_data")

In [21]:
#Using sql to check for null values. Only Lat column has 7 null values.
#df2 = sql.sql("SELECT _c0, SOC_NAME from h1_b_data")

df2 = sql.sql("SELECT * from h1_b_data where SOC_NAME like '%COMPUTER%' OR SOC_NAME like '%Computer%'")
df3 = sql.sql("SELECT * from h1_b_data where SOC_NAME not like '%COMPUTER%' AND SOC_NAME not like '%Computer%'")

#df2.show()
df2.count()


1328062

In [22]:
df3.count()

1674396

In [25]:
from pyspark.sql.functions import lit

df2 = df2.withColumn('SOC_NAME_CONVERTED', lit(1))
df3 = df3.withColumn('SOC_NAME_CONVERTED', lit(0))

In [26]:
df2.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- CASE_STATUS: string (nullable = true)
 |-- EMPLOYER_NAME: string (nullable = true)
 |-- SOC_NAME: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- FULL_TIME_POSITION: string (nullable = true)
 |-- PREVAILING_WAGE: double (nullable = true)
 |-- YEAR: string (nullable = true)
 |-- WORKSITE: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- SOC_NAME_CONVERTED: integer (nullable = false)



In [27]:
#df2.count()
df2.show()

+----+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+------------+----------+-------------+---------------+------------------+
| _c0|        CASE_STATUS|       EMPLOYER_NAME|            SOC_NAME|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|            WORKSITE|         lon|       lat|         CITY|          STATE|SOC_NAME_CONVERTED|
+----+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+------------+----------+-------------+---------------+------------------+
|7337|CERTIFIED-WITHDRAWN|NORTHWEST EVALUAT...|COMPUTER & INFORM...|INFORMATION SERVI...|                 Y|       132475.0|2016|  NEW YORK, NEW YORK| -74.0059413|40.7127837|     NEW YORK|       NEW YORK|                 1|
|7338|CERTIFIED-WITHDRAWN|NORTHWEST EVALUAT...|COMPUTER & INFORM...|INFORMATION SERVI...|               

In [28]:
df3.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- CASE_STATUS: string (nullable = true)
 |-- EMPLOYER_NAME: string (nullable = true)
 |-- SOC_NAME: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- FULL_TIME_POSITION: string (nullable = true)
 |-- PREVAILING_WAGE: double (nullable = true)
 |-- YEAR: string (nullable = true)
 |-- WORKSITE: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- SOC_NAME_CONVERTED: integer (nullable = false)



In [30]:
df3.show()

+---+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+------------+----------+-----------------+--------------+------------------+
|_c0|        CASE_STATUS|       EMPLOYER_NAME|            SOC_NAME|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|            WORKSITE|         lon|       lat|             CITY|         STATE|SOC_NAME_CONVERTED|
+---+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+------------+----------+-----------------+--------------+------------------+
|  1|CERTIFIED-WITHDRAWN|UNIVERSITY OF MIC...|BIOCHEMISTS AND B...|POSTDOCTORAL RESE...|                 N|        36067.0|2016| ANN ARBOR, MICHIGAN| -83.7430378|42.2808256|        ANN ARBOR|      MICHIGAN|                 0|
|  2|CERTIFIED-WITHDRAWN|GOODMAN NETWORKS,...|    CHIEF EXECUTIVES|CHIEF OPERATING O...|        

In [31]:
result = df2.union(df3)

In [32]:
result.count()

3002458

In [33]:
result.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- CASE_STATUS: string (nullable = true)
 |-- EMPLOYER_NAME: string (nullable = true)
 |-- SOC_NAME: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- FULL_TIME_POSITION: string (nullable = true)
 |-- PREVAILING_WAGE: double (nullable = true)
 |-- YEAR: string (nullable = true)
 |-- WORKSITE: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- SOC_NAME_CONVERTED: integer (nullable = false)



In [34]:
result.show()

+----+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+------------+----------+-------------+---------------+------------------+
| _c0|        CASE_STATUS|       EMPLOYER_NAME|            SOC_NAME|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|            WORKSITE|         lon|       lat|         CITY|          STATE|SOC_NAME_CONVERTED|
+----+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+------------+----------+-------------+---------------+------------------+
|7337|CERTIFIED-WITHDRAWN|NORTHWEST EVALUAT...|COMPUTER & INFORM...|INFORMATION SERVI...|                 Y|       132475.0|2016|  NEW YORK, NEW YORK| -74.0059413|40.7127837|     NEW YORK|       NEW YORK|                 1|
|7338|CERTIFIED-WITHDRAWN|NORTHWEST EVALUAT...|COMPUTER & INFORM...|INFORMATION SERVI...|               

In [35]:
sql.registerDataFrameAsTable(result, "h1_b_data_soc_name_converted")

In [80]:
#Excluding from dataframe SOC_NAMES that have 4 or more comma separates values. 78 such rows. 
#df4 = sql.sql("SELECT CASE_STATUS, FULL_TIME_POSITION, PREVAILING_WAGE, YEAR, STATE, CITY, SOC_NAME_CONVERTED from h1_b_data_soc_name_converted where CASE_STATUS in ('CERTIFIED', 'CERTIFIED-WITHDRAWN', 'DENIED') AND FULL_TIME_POSITION in ('Y','N') AND STATE NOT IN ('NA') AND STATE is not null AND CITY NOT IN ('NA') AND CITY is not null")
#df2.take(78)
df4 = sql.sql("SELECT CASE_STATUS, FULL_TIME_POSITION, PREVAILING_WAGE, YEAR, STATE, CITY, SOC_NAME_CONVERTED from h1_b_data_soc_name_converted where CASE_STATUS in ('CERTIFIED', 'CERTIFIED-WITHDRAWN', 'DENIED') AND FULL_TIME_POSITION in ('Y','N') AND STATE != 'NA' AND STATE is not null AND CITY != 'NA' AND CITY is not null")
df4.count()

2912586

In [None]:
#Recreating the table from dataframe after exclusion of above SOC_NAMES
sql.registerDataFrameAsTable(df, "h1_b_data")

In [13]:
df.show(5)

+---+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+-----------+----------+-----------+-----------+
|_c0|        CASE_STATUS|       EMPLOYER_NAME|            SOC_NAME|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|            WORKSITE|        lon|       lat|       CITY|      STATE|
+---+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+-----------+----------+-----------+-----------+
|  1|CERTIFIED-WITHDRAWN|UNIVERSITY OF MIC...|BIOCHEMISTS AND B...|POSTDOCTORAL RESE...|                 N|        36067.0|2016| ANN ARBOR, MICHIGAN|-83.7430378|42.2808256|  ANN ARBOR|   MICHIGAN|
|  2|CERTIFIED-WITHDRAWN|GOODMAN NETWORKS,...|    CHIEF EXECUTIVES|CHIEF OPERATING O...|                 Y|       242674.0|2016|        PLANO, TEXAS|-96.6988856|33.0198431|      PLANO|      TEXAS|
|  3|CERTIFIED-

In [67]:
#df.select('STATE').distinct().take(54)
df4.select('STATE').distinct().count()

53

In [38]:
df2.show()

+-----+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+------------+----------+
|  _c0|        CASE_STATUS|       EMPLOYER_NAME|            SOC_NAME|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|            WORKSITE|         lon|       lat|
+-----+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+------------+----------+
|22825|CERTIFIED-WITHDRAWN|KLA-TENCOR CORPOR...|PURCHASING AGENTS...|SUPPLY CHAIN SPEC...|                 N|          63378|2016|MILPITAS, CALIFORNIA|-121.8995741|37.4323341|
|22826|CERTIFIED-WITHDRAWN|     MEDTRONIC, INC.|PURCHASING AGENTS...|SUPPLY CHAIN ANALYST|                 N|          56326|2016|CARLSBAD, CALIFORNIA|-117.3505939|33.1580933|
|22827|CERTIFIED-WITHDRAWN|BORG WARNER EMISS...|PURCHASING AGENTS...|               BUYER|                 N|          4

In [84]:
r = df4.groupby('STATE').count()
#df4.count()
r.show(53)

+--------------------+------+
|               STATE| count|
+--------------------+------+
|             INDIANA| 29320|
|             VERMONT|  1877|
|           LOUISIANA| 11059|
|        NORTH DAKOTA|  2817|
|             FLORIDA|102694|
|                UTAH| 12656|
|       NEW HAMPSHIRE|  9603|
|          CALIFORNIA|542657|
|       MASSACHUSETTS|113028|
|              ALASKA|  1363|
|            DELAWARE| 17708|
|      NORTH CAROLINA| 77536|
|        SOUTH DAKOTA|  1827|
|        RHODE ISLAND| 11455|
|            MARYLAND| 54451|
|                IOWA| 16233|
|          NEW JERSEY|207631|
|              HAWAII|  3644|
|            KENTUCKY| 12296|
|         MISSISSIPPI|  4099|
|            ARKANSAS| 14342|
|               IDAHO|  4038|
|            MICHIGAN| 80525|
|        PENNSYLVANIA|107258|
|           WISCONSIN| 31772|
|             ALABAMA|  9910|
|             ARIZONA| 40797|
|           MINNESOTA| 47417|
|            VIRGINIA| 87514|
|            ILLINOIS|156234|
|         

In [83]:
#df.select('CASE_STATUS').distinct().show()
df4.select('STATE').distinct().count()

53

In [81]:
#df4.na.drop()
df4 = df4.na.drop()

In [82]:
df4.count()

2912535

In [78]:
df4.select('STATE').distinct().count()

53

In [85]:
df4.write \
    .format('csv') \
    .option("header","true")\
    .save('H1B_CLEANED.csv')