In [1]:
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

In [3]:
spark = SparkSession.builder \
...     .master("local[6]") \
...     .appName("PHD") \
...     .config("spark.driver.memory","15g") \
...     .getOrCreate()

NameError: name 'SparkSession' is not defined

In [2]:
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("PHD Data Set").setMaster('local')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [4]:
phdDataSchema = StructType([
    StructField("ROW_ID", StringType(), True),
    StructField("CASE_STATUS", StringType(), True),
    StructField("EMPLOYER_NAME", StringType(), True),
    StructField("SOC_NAME", StringType(), True),
    StructField("JOB_TITLE", StringType(), True),
    StructField("FULL_TIME_POSITION", StringType(), True),
    StructField("PREVAILING_WAGE", StringType(), True),
    StructField("YEAR", StringType(), True),        
    StructField("WORKSITE", StringType(), True),
    StructField("lon", StringType(), True),
    StructField("lat", StringType(), True),
   ])

In [5]:
data = spark.read.format("csv")\
       .option("header", "true")\
       .option("inferSchema", "true")\
       .load("hdfs:///user/datasets/B35PHD/", schema = phdDataSchema)

In [6]:
data.printSchema()

root
 |-- ROW_ID: string (nullable = true)
 |-- CASE_STATUS: string (nullable = true)
 |-- EMPLOYER_NAME: string (nullable = true)
 |-- SOC_NAME: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- FULL_TIME_POSITION: string (nullable = true)
 |-- PREVAILING_WAGE: string (nullable = true)
 |-- YEAR: string (nullable = true)
 |-- WORKSITE: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- lat: string (nullable = true)



In [7]:
data.dtypes

[('ROW_ID', 'string'),
 ('CASE_STATUS', 'string'),
 ('EMPLOYER_NAME', 'string'),
 ('SOC_NAME', 'string'),
 ('JOB_TITLE', 'string'),
 ('FULL_TIME_POSITION', 'string'),
 ('PREVAILING_WAGE', 'string'),
 ('YEAR', 'string'),
 ('WORKSITE', 'string'),
 ('lon', 'string'),
 ('lat', 'string')]

In [8]:
print("No. of Columns = {}".format(len(data.columns)))

print('No. of Records = {}'.format(data.count()))

No. of Columns = 11
No. of Records = 3002458


In [9]:
data.head(3)

[Row(ROW_ID=1, CASE_STATUS=u'CERTIFIED-WITHDRAWN', EMPLOYER_NAME=u'UNIVERSITY OF MICHIGAN', SOC_NAME=u'BIOCHEMISTS AND BIOPHYSICISTS', JOB_TITLE=u'POSTDOCTORAL RESEARCH FELLOW', FULL_TIME_POSITION=u'N', PREVAILING_WAGE=36067.0, YEAR=2016, WORKSITE=u'ANN ARBOR, MICHIGAN', lon=-83.7430378, lat=42.2808256),
 Row(ROW_ID=2, CASE_STATUS=u'CERTIFIED-WITHDRAWN', EMPLOYER_NAME=u'GOODMAN NETWORKS, INC.', SOC_NAME=u'CHIEF EXECUTIVES', JOB_TITLE=u'CHIEF OPERATING OFFICER', FULL_TIME_POSITION=u'Y', PREVAILING_WAGE=242674.0, YEAR=2016, WORKSITE=u'PLANO, TEXAS', lon=-96.6988856, lat=33.0198431),
 Row(ROW_ID=3, CASE_STATUS=u'CERTIFIED-WITHDRAWN', EMPLOYER_NAME=u'PORTS AMERICA GROUP, INC.', SOC_NAME=u'CHIEF EXECUTIVES', JOB_TITLE=u'CHIEF PROCESS OFFICER', FULL_TIME_POSITION=u'Y', PREVAILING_WAGE=193066.0, YEAR=2016, WORKSITE=u'JERSEY CITY, NEW JERSEY', lon=-74.0776417, lat=40.7281575)]

In [15]:
data.show(9)

+------+--------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+------------+----------+
|ROW_ID|         CASE_STATUS|       EMPLOYER_NAME|            SOC_NAME|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|            WORKSITE|         lon|       lat|
+------+--------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+------------+----------+
|     1|CERTIFIEDNAWITHDRAWN|UNIVERSITY OF MIC...|BIOCHEMISTS AND B...|POSTDOCTORAL RESE...|                 N|          36067|2016| ANN ARBOR, MICHIGAN| -83.7430378|42.2808256|
|     2|CERTIFIEDNAWITHDRAWN|GOODMAN NETWORKS,...|    CHIEF EXECUTIVES|CHIEF OPERATING O...|                 Y|         242674|2016|        PLANO, TEXAS| -96.6988856|33.0198431|
|     3|CERTIFIEDNAWITHDRAWN|PORTS AMERICA GRO...|    CHIEF EXECUTIVES|CHIEF PROCESS OFF...|                 Y

In [11]:
data.select('lon').show(6)

+-----------+
|        lon|
+-----------+
|-83.7430378|
|-96.6988856|
|-74.0776417|
|-104.990251|
|-90.1994042|
|-80.1917902|
+-----------+
only showing top 6 rows



In [44]:
data.select('CASE_STATUS').where(col('CASE_STATUS').like("%`%")).show()

+-----------+
|CASE_STATUS|
+-----------+
+-----------+



In [13]:
data=data.withColumn('CASE_STATUS', when(col('CASE_STATUS').like("%-%"), regexp_replace('CASE_STATUS', '-', 'NA')).otherwise(data.CASE_STATUS))

In [22]:
data.select('EMPLOYER_NAME').where(col('EMPLOYER_NAME').like("%\"%")).show()

+-------------+
|EMPLOYER_NAME|
+-------------+
+-------------+



In [21]:
data=data.withColumn('EMPLOYER_NAME', when(col('EMPLOYER_NAME').like("%.%"), regexp_replace('CASE_STATUS', '.', 'NA')).otherwise(data.EMPLOYER_NAME))

In [23]:
data.select('JOB_TITLE').show()

+--------------------+
|           JOB_TITLE|
+--------------------+
|POSTDOCTORAL RESE...|
|CHIEF OPERATING O...|
|CHIEF PROCESS OFF...|
|REGIONAL PRESIDEN...|
|PRESIDENT MONGOLI...|
|EXECUTIVE V P, GL...|
|CHIEF OPERATING O...|
|CHIEF OPERATIONS ...|
|           PRESIDENT|
|           PRESIDENT|
|CHIEF INFORMATION...|
|VICE PRESIDENT AN...|
|   TREASURER AND COO|
|CHIEF COMMERCIAL ...|
|        BOARD MEMBER|
|CHIEF FINANCIAL O...|
|VICE PRESIDENT OF...|
|GENERAL MANAGER, ...|
|                 CEO|
|PRESIDENT, NORTHE...|
+--------------------+
only showing top 20 rows



In [26]:
data=data.withColumn('JOB_TITLE', when(col('JOB_TITLE').like("%,%"), regexp_replace('JOB_TITLE', ',' , 'NA')).otherwise(data.JOB_TITLE))

In [31]:
data.select('WORKSITE').show()

+--------------------+
|            WORKSITE|
+--------------------+
|ANN ARBORNA MICHIGAN|
|       PLANONA TEXAS|
|JERSEY CITYNA NEW...|
|   DENVERNA COLORADO|
|ST. LOUISNA MISSOURI|
|     MIAMINA FLORIDA|
|     HOUSTONNA TEXAS|
|SAN JOSENA CALIFO...|
|     MEMPHISNA TEXAS|
|   VIENNANA VIRGINIA|
|PITTSBURGHNA PENN...|
|  MIDLANDNA MICHIGAN|
|FAIRHAVENNA MASSA...|
|     MIAMINA FLORIDA|
|GREENWOOD VILLAGE...|
| STERLINGNA VIRGINIA|
|WAUKESHANA WISCONSIN|
|LOS ANGELESNA CAL...|
|SANTA CLARANA CAL...|
|ALEXANDRIANA VIRG...|
+--------------------+
only showing top 20 rows



In [32]:
data=data.withColumn('WORKSITE', when(col('WORKSITE').like("%.%"), regexp_replace('WORKSITE', '.', 'NA')).otherwise(data.WORKSITE))

In [35]:
data.select('lon').show()

+------------+
|         lon|
+------------+
| -83.7430378|
| -96.6988856|
| -74.0776417|
| -104.990251|
| -90.1994042|
| -80.1917902|
| -95.3698028|
|-121.8863286|
|           0|
| -77.2652604|
| -79.9958864|
| -84.2472116|
|           0|
| -80.1917902|
|-104.9508141|
| -77.4291298|
| -88.2314813|
|-118.2436849|
|-121.9552356|
| -77.0469214|
+------------+
only showing top 20 rows



In [125]:
data=data.withColumn('lon',data['lon'].cast(StringType()))

In [34]:
data=data.withColumn('lon', when(data.lon == 'NA', 0).otherwise(data.lon))

In [133]:
data=data.withColumn('lon',data['lon'].cast(DoubleType()))

In [131]:
data=data.withColumn('lat',data['lat'].cast(StringType()))

In [36]:
data=data.withColumn('lat', when(data.lat == 'NA', 0).otherwise(data.lat))

In [134]:
data=data.withColumn('lat',data['lat'].cast(DoubleType()))

In [37]:
data.describe().show()

+-------+-----------------+-----------+--------------------+--------------------+--------------------+------------------+-----------------+--------------------+--------------------+------------------+-----------------+
|summary|           ROW_ID|CASE_STATUS|       EMPLOYER_NAME|            SOC_NAME|           JOB_TITLE|FULL_TIME_POSITION|  PREVAILING_WAGE|                YEAR|            WORKSITE|               lon|              lat|
+-------+-----------------+-----------+--------------------+--------------------+--------------------+------------------+-----------------+--------------------+--------------------+------------------+-----------------+
|  count|          3002458|    3002458|             3002458|             3002458|             3002458|           3002458|          3002458|             3002458|             3002458|           3002458|          3002451|
|   mean|        1501229.5|       null|       3.218588665E8|                null|   263785.4022988506| 68509.25714285714|146

In [50]:
#. Find the count of distinct values in each column.

from pyspark.sql.functions import col, countDistinct
data.agg(countDistinct(col("CASE_STATUS")).alias("count")).show()
data.agg(countDistinct(col("EMPLOYER_NAME")).alias("count")).show()
data.agg(countDistinct(col("SOC_NAME")).alias("count")).show()
data.agg(countDistinct(col("JOB_TITLE")).alias("count")).show()
data.agg(countDistinct(col("FULL_TIME_POSITION")).alias("count")).show()
data.agg(countDistinct(col("WORKSITE")).alias("count")).show()

+-----+
|count|
+-----+
|    8|
+-----+

+-----+
|count|
+-----+
|    8|
+-----+

+-----+
|count|
+-----+
|    8|
+-----+

+-----+
|count|
+-----+
|    8|
+-----+



In [77]:
#5 List EMPLOYER_NAME and YEAR in the descending order of the Approved
#applications count (Approved applications are obtained using
#CASE_STATUS = 'CERTIFIED').

data.select('EMPLOYER_NAME','YEAR').where(col('CASE_STATUS')=='CERTIFIED').show()

+--------------------+----+
|       EMPLOYER_NAME|YEAR|
+--------------------+----+
|      QUICKLOGIX LLC|2016|
|           CERTIFIED|2016|
|           CERTIFIED|2016|
|  NANANANANANANANANA|2016|
|           CERTIFIED|2016|
|  NANANANANANANANANA|2016|
|          UMBEL CORP|2016|
|           CERTIFIED|2016|
|  NANANANANANANANANA|2016|
|THE KRAFT HEINZ C...|2016|
|           CERTIFIED|2016|
|VMS COMMUNICATION...|2016|
|           CERTIFIED|2016|
|       LABEL INSIGHT|2016|
|INN AT THE WICKLI...|2016|
|           CERTIFIED|2016|
|  CB LANSING 300 LLP|2016|
|           CERTIFIED|2016|
|           CERTIFIED|2016|
|           CERTIFIED|2016|
+--------------------+----+
only showing top 20 rows



In [52]:
#Observe the above results and list the EMPLOYER_NAME that had the
#maximum approved applications for each year for 2012, 2013, 2014, 2015 and
#2016?

data.select(('EMPLOYER_NAME')).where(col('CASE_STATUS')=='CERTIFIED').group by('year').show()

SyntaxError: invalid syntax (<ipython-input-52-017eebc09cd8>, line 5)

In [78]:
datapproved=data.select('EMPLOYER_NAME','YEAR','JOB_TITLE').where(col('CASE_STATUS')=='CERTIFIED').show()

+--------------------+----+--------------------+
|       EMPLOYER_NAME|YEAR|           JOB_TITLE|
+--------------------+----+--------------------+
|      QUICKLOGIX LLC|2016|                 CEO|
|           CERTIFIED|2016|PRESIDENTNA NORTH...|
|           CERTIFIED|2016|                 CEO|
|  NANANANANANANANANA|2016|CHIEF FINANCIAL O...|
|           CERTIFIED|2016|                 CEO|
|  NANANANANANANANANA|2016|CHIEF BUSINESS OF...|
|          UMBEL CORP|2016|VICE PRESIDENT OF...|
|           CERTIFIED|2016|  EXECUTIVE DIRECTOR|
|  NANANANANANANANANA|2016|VICE PRESIDENTNA ...|
|THE KRAFT HEINZ C...|2016|    HEAD OF US SALES|
|           CERTIFIED|2016|CHIEF FINANCIAL O...|
|VMS COMMUNICATION...|2016|CHIEF OPERATING O...|
|           CERTIFIED|2016|VICE PRESIDENT OF...|
|       LABEL INSIGHT|2016|CHIEF EXECUTIVE O...|
|INN AT THE WICKLI...|2016|CHIEF EXECUTIVE O...|
|           CERTIFIED|2016|VICE PRESIDENTNA ...|
|  CB LANSING 300 LLP|2016|CHIEF OPERATING O...|
|           CERTIFIE

In [91]:
#List the approved applications count in the descending order for the
#JOB_TITLE = "DATA SCIENTIST" and for each employer and year.
data.select('EMPLOYER_NAME','YEAR').where( col('JOB_TITLE') == 'DATA SCIENTIST' ).show()

+--------------------+----+
|       EMPLOYER_NAME|YEAR|
+--------------------+----+
|           CERTIFIED|2016|
|           CERTIFIED|2016|
|           CERTIFIED|2016|
|           CERTIFIED|2016|
|           CERTIFIED|2016|
|           CERTIFIED|2016|
|  NANANANANANANANANA|2016|
|CERTIFIEDNAWITHDRAWN|2016|
|APPLIED DATA FINA...|2016|
|APPLIED DATA FINA...|2016|
|APPLIED DATA FINA...|2016|
|           WITHDRAWN|2016|
|CERTIFIEDNAWITHDRAWN|2016|
|           CERTIFIED|2016|
|    HASH FINANCE LLC|2016|
|GEISINGER SYSTEM ...|2016|
|CERTIFIEDNAWITHDRAWN|2016|
|LINKEDIN CORPORATION|2016|
|LINKEDIN CORPORATION|2016|
|CERTIFIEDNAWITHDRAWN|2016|
+--------------------+----+
only showing top 20 rows



In [99]:
data=data.withColumn('PREVAILING_WAGE',data['PREVAILING_WAGE'].cast(DoubleType()))  

#PREVAILING_WAGE

In [107]:
#8. Find the null values count in each column.
#. Verify the null values count in each column
from pyspark.sql.functions import isnan, when, count, col

data.select([count(when((c), c)).alias(c) for c in data.columns]).show()

TypeError: condition should be a Column

In [92]:
#Remove all the rows with null values (in any column/position).
data = data.where(data.lon==0).drop()

In [93]:
data = data.where(data.lat==0).drop()

In [96]:
#. Verify the null values count in each column
from pyspark.sql.functions import isnan, when, count, col

data.select([count(when(isnan(c), c)).alias(c) for c in data.columns]).show()


+------+-----------+-------------+--------+---------+------------------+---------------+----+--------+---+---+
|ROW_ID|CASE_STATUS|EMPLOYER_NAME|SOC_NAME|JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|WORKSITE|lon|lat|
+------+-----------+-------------+--------+---------+------------------+---------------+----+--------+---+---+
|     0|          0|            0|       0|        0|                 0|              0|   0|       0|  0|  0|
+------+-----------+-------------+--------+---------+------------------+---------------+----+--------+---+---+



In [None]:
#List the count of applications in each status (CASE_STATUS) in the descending
#order of the year.
