In [1]:
from pyspark import SparkContext
import pyspark

#Loading data into spark
h1bDF = spark.read.csv('/FileStore/tables/h1b_kaggle.csv', header=True, inferSchema=True)
#Choose to run on Disk only
h1bDF.persist(pyspark.StorageLevel.DISK_ONLY)

In [2]:
#Part1 Use Apache Spark Core (Transformations and Action on RDD) to answer the following queries?
#1 What is the total number of rows in this dataset?
h1bDF.count()

In [3]:
#2 How many different employers filed for H-1B visa?
employers = h1bDF.select('EMPLOYER_NAME').distinct()
employers.count()

In [4]:
#3 Show 10 different employers ordered by employer name in descending order.
employers.orderBy(['EMPLOYER_NAME'], descending=True).show(10)

In [5]:
#Part2 Use Apache Spark Core (Transformations and Action on RDD) to answer the following queries?
#1 How many visa petitions are there for each ‘case status’ (e.g., 1000 withdrawn, 100 denied)? Order results by the number of visa petitions (i.e., count) in ascending order. Note, ascending order is the default, thus, there is no need to specify that.
h1bDF.groupBy('CASE_STATUS').count().orderBy('count').show()

In [6]:
#2 Repeat the above query to perform in-memory processing. Databricks will display execution time for each ‘cell’, take a screenshot of the cell including the running times for both in-memory and on-disk.
#Not sure about the question, need to clarify with prof.
h1bDF.cache()

In [7]:
#What is the total number of rows in this dataset?
h1bDF.count()

In [8]:
#How many different employers filed for H-1B visa?
employers = h1bDF.select('EMPLOYER_NAME').distinct()
employers.count()

In [9]:
#Show 10 different employers ordered by employer name in descending order.
employers.orderBy(['EMPLOYER_NAME'], descending=True).show(10)

In [10]:
#Part2 Use Apache Spark Core (Transformations and Action on RDD) to answer the following queries?
#How many visa petitions are there for each ‘case status’ (e.g., 1000 withdrawn, 100 denied)? Order results by the number of visa petitions (i.e., count) in ascending order. Note, ascending order is the default, thus, there is no need to specify that.
h1bDF.groupBy('CASE_STATUS').count().orderBy('count').show()

In [11]:
#3 Use Python, R, or Scala to create a new data-frame (RDD) that contains employer names only. Then, iterate (i.e., loop) through the newly created data-frame to print all rows of data/employer names, and count/print number of rows. Hint, a transformation returns a new data-frame. Be patient, this might take a while.
employersDF = h1bDF.select('EMPLOYER_NAME')
for row in employersDF.collect():
  print(row)
employersDF.count()

In [12]:
#Part3 Use Apache Spark Core and Spark SQL to answer the following queries. Hint, you might need to create a temporary view/table in order to perform SQL queries. Cache both RDDs and Views to perform in-memory processing.
#1 How many visa petitions were denied in 2016?
# Create a VIEW to perform SQL queries

#done the RDD cahche in the above part

h1bDF.createOrReplaceTempView("h1bView")
# cache table
spark.catalog.cacheTable("h1bView")
# call .count() to materialize the cache

In [13]:
%sql
/*Part3 Use Apache Spark Core and Spark SQL to answer the following queries. Hint, you might need to create a temporary view/table in order to perform SQL queries. Cache both RDDs and Views to perform in-memory processing.*/
/*How many visa petitions were denied in 2016?*/

select count(CASE_STATUS)
from h1bView 
where CASE_STATUS == 'DENIED' and YEAR == '2016';

count(CASE_STATUS)
9175


In [14]:
%sql
/*Show different employers in California and Pennsylvania who have certified visa petitions in 2013. Order results by employer names in ascending order.*/
select distinct EMPLOYER_NAME
from h1bView
where WORKSITE regexp 'CALIFORNIA | PENNSYLVANIA' and CASE_STATUS == 'CERTIFIED' and YEAR == '2013'
order by EMPLOYER_NAME;

EMPLOYER_NAME
"1 WAY SOLUTIONS, INC."
1SEO.COM
"24/7 MEDIA, INC."
3A SOFT INC.
3CUBE SOLUTIONS INC
"3I INFOTECH, INC"
"3I INFOTECH, INC."
"3I SOLUTIONS, INC."
3K TECHNOLOGIES LLC
"3K TECHNOLOGIES, LLC"


In [15]:
#3 Is it possible to perform SQL queries directly on RDD?
#Yes, in a way, you may be able to do so. You'll need to create your own version of DataFrame. DataFrame is an abstraction over RDDs. Nevertheless, joins, filters, etc. the features that you find with Spark-SQL are optimized with DataFrames but they were made on RDDs first. Spark introduces the concept of an RDD (Resilient Distributed Dataset), an immutable fault-tolerant, distributed collection of objects that can be operated on in parallel. An RDD can contain any type of object and is created by loading an external dataset or distributing a collection from the driver program. Schema RDD is a RDD where you can run SQL on. It is more than SQL. It is a unified interface for structured data.

In [16]:
#Part4 Pre-define the Schema manually (inferring the schema) in order to use Data Aggregation functions and compare numeric values (e.g., <, >, =) using Apache Spark Core. You will need to import the following libraries to pre-define/infer schema and use aggregation functions:
from pyspark.sql.functions import min, max, avg
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.types import *

#1 Since SOC_NAME column does not specify what sort of data it contains, change the column name to OCCUPATION_CODE for a coherent column name. Take a screenshot of the Schema, to verify that the column name has changed. Hint, use DF.printSchema() to view column names and data types.
h1bDF = h1bDF.withColumnRenamed('SOC_NAME', 'OCCUPATION_CODE')
#convert PREVAILING_WAGE from datatype to decimal 
h1bDF = h1bDF.withColumn('PREVAILING_WAGE', h1bDF.PREVAILING_WAGE.cast(DecimalType(20)))
h1bDF.printSchema()

In [17]:
#2 Show the lowest, highest, and average wage/salary of all filed visa petitions?
#filter to get only the legit value
h1bDF_filter = h1bDF.select('PREVAILING_WAGE').filter(h1bDF.PREVAILING_WAGE>0)
h1bDF_filter.agg(max("PREVAILING_WAGE")).show()
h1bDF_filter.agg(min("PREVAILING_WAGE")).show()
h1bDF_filter.agg(avg("PREVAILING_WAGE")).show()

In [18]:
#3 Print an appropriate message if there are wages that are greater than 80,000. If yes, print how many, else, print “none”.
from pyspark.sql.functions import col, when
h1bDF_filter.withColumn('wage_grt80000', when(col('PREVAILING_WAGE') > 80000, "Yes").otherwise("No")).show()
h1bDF_filter.where(h1bDF_filter['PREVAILING_WAGE']>80000).count()

In [19]:
#3 alternative solution
count = 0
for x in h1bDF_filter.collect():
  if x.PREVAILING_WAGE > 80000:
    count+=1
if count>0:
  print("Yes, there are wages greater than $80,000 : " + str(count))
else:
  print("None")
