In [43]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import *

import pyspark.sql.functions as F
import configparser

# Read AWS credentials form aws.cfg and store in variables.

In [44]:
config = configparser.ConfigParser()
config.read_file(open('aws.cfg'))

AWS_ACCESS_KEY_ID = config.get('AWS', 'AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

# Create SparkSessson

In [45]:
conf = SparkConf() \
        .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.0') \
        .set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
        .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY_ID) \
        .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_ACCESS_KEY) \
    

spark = SparkSession.builder \
                    .master('local') \
                    .config(conf=conf) \
                    .appName('test-spark') \
                    .getOrCreate()

# Using spark Reads file form S3

* Set our S3 bucket

In [46]:
bucket = 's3-redshift-airflow-project'

* Set our csv file name

In [47]:
file = 'DataEngineer.csv'

* Read file from S3

In [48]:
df = spark.read.csv(f's3a://{bucket}/{file}', header=True)

# Checking data

In [49]:
df.limit(1).show()

+-------------+--------------------+--------------------+------+------------+------------+------------+-----------------+-------+-----------------+----------+-----------------+--------------------+-------------------+----------+
|    Job Title|     Salary Estimate|     Job Description|Rating|Company Name|    Location|Headquarters|             Size|Founded|Type of ownership|  Industry|           Sector|             Revenue|        Competitors|Easy Apply|
+-------------+--------------------+--------------------+------+------------+------------+------------+-----------------+-------+-----------------+----------+-----------------+--------------------+-------------------+----------+
|Data Engineer|$80K-$150K (Glass...|Company Descripti...|   4.5| Sagence 4.5|New York, NY| Chicago, IL|1 to 50 employees|   2009|Company - Private|Consulting|Business Services|$10 to $25 millio...|WCI Consulting, PwC|        -1|
+-------------+--------------------+--------------------+------+------------+-------

In [50]:
df.printSchema()

root
 |-- Job Title: string (nullable = true)
 |-- Salary Estimate: string (nullable = true)
 |-- Job Description: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Company Name: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Headquarters: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Founded: string (nullable = true)
 |-- Type of ownership: string (nullable = true)
 |-- Industry: string (nullable = true)
 |-- Sector: string (nullable = true)
 |-- Revenue: string (nullable = true)
 |-- Competitors: string (nullable = true)
 |-- Easy Apply: string (nullable = true)



In [51]:
df.count()

2528

# Drop unnecessary columns

In [52]:
df = df.drop('Size') \
        .drop('Type of ownership') \
        .drop('Job Description') \
        .drop('Revenue') \
        .drop('Competitors') \
        .drop('Easy Apply') \
        .drop('Sector')

In [53]:
df.limit(1).show()

+-------------+--------------------+------+------------+------------+------------+-------+----------+
|    Job Title|     Salary Estimate|Rating|Company Name|    Location|Headquarters|Founded|  Industry|
+-------------+--------------------+------+------------+------------+------------+-------+----------+
|Data Engineer|$80K-$150K (Glass...|   4.5| Sagence 4.5|New York, NY| Chicago, IL|   2009|Consulting|
+-------------+--------------------+------+------------+------------+------------+-------+----------+



# Rename columns to lowercase and replace white space to underscore

In [54]:
df = df.toDF(*(c.replace(' ', '_').lower() for c in df.columns))

In [55]:
df.limit(1).show()

+-------------+--------------------+------+------------+------------+------------+-------+----------+
|    job_title|     salary_estimate|rating|company_name|    location|headquarters|founded|  industry|
+-------------+--------------------+------+------------+------------+------------+-------+----------+
|Data Engineer|$80K-$150K (Glass...|   4.5| Sagence 4.5|New York, NY| Chicago, IL|   2009|Consulting|
+-------------+--------------------+------+------------+------------+------------+-------+----------+



# Split city and state code from location, headquarters
* some values in headquarters column are country, it's fine we can use that for analysis too

In [56]:
df = df.withColumn('location_city', split(df['location'], ',').getItem(0)) \
                .withColumn('location_state', split(df['location'], ',').getItem(1)) \
                .withColumn('hq_city', split(df['headquarters'], ',').getItem(0)) \
                .withColumn('hq_state', split(df['headquarters'], ',').getItem(1)) \
                .drop('location', 'headquarters')

In [57]:
df.limit(1).show()

+-------------+--------------------+------+------------+-------+----------+-------------+--------------+-------+--------+
|    job_title|     salary_estimate|rating|company_name|founded|  industry|location_city|location_state|hq_city|hq_state|
+-------------+--------------------+------+------------+-------+----------+-------------+--------------+-------+--------+
|Data Engineer|$80K-$150K (Glass...|   4.5| Sagence 4.5|   2009|Consulting|     New York|            NY|Chicago|      IL|
+-------------+--------------------+------+------------+-------+----------+-------------+--------------+-------+--------+



# Rework salary_estimate column

In [58]:
df = df.withColumn('salary_estimate', regexp_replace('salary_estimate', '[$a-jl-zA-JL-Z.()]', '')) \
                .withColumn('salary_min', split('salary_estimate', '-').getItem(0)) \
                .withColumn('salary_min', regexp_replace('salary_min', 'K', '000')) \
                .withColumn('salary_max', split('salary_estimate', '-').getItem(1)) \
                .withColumn('salary_max', regexp_replace('salary_max', 'K', '000')) \
                .withColumn('salary_max', regexp_replace('salary_max', ' ', '')) \
                .drop('salary_estimate')

In [59]:
df = df.withColumn('salary_min', df.salary_min.cast('int')) \
                .withColumn('salary_max', df.salary_max.cast('int')) \
                .withColumn('rating', df.rating.cast('float')) \
                .withColumn('founded', df.founded.cast('int'))

In [60]:
df.limit(1).show()

+-------------+------+------------+-------+----------+-------------+--------------+-------+--------+----------+----------+
|    job_title|rating|company_name|founded|  industry|location_city|location_state|hq_city|hq_state|salary_min|salary_max|
+-------------+------+------------+-------+----------+-------------+--------------+-------+--------+----------+----------+
|Data Engineer|   4.5| Sagence 4.5|   2009|Consulting|     New York|            NY|Chicago|      IL|     80000|    150000|
+-------------+------+------------+-------+----------+-------------+--------------+-------+--------+----------+----------+



In [61]:
df.printSchema()

root
 |-- job_title: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- company_name: string (nullable = true)
 |-- founded: integer (nullable = true)
 |-- industry: string (nullable = true)
 |-- location_city: string (nullable = true)
 |-- location_state: string (nullable = true)
 |-- hq_city: string (nullable = true)
 |-- hq_state: string (nullable = true)
 |-- salary_min: integer (nullable = true)
 |-- salary_max: integer (nullable = true)



# Remove rating from company_name column (last 4 characters)

In [62]:
df = df.withColumn('company_name',expr('substring(company_name, 1, length(company_name)-4)'))

In [63]:
df.limit(1).show()

+-------------+------+------------+-------+----------+-------------+--------------+-------+--------+----------+----------+
|    job_title|rating|company_name|founded|  industry|location_city|location_state|hq_city|hq_state|salary_min|salary_max|
+-------------+------+------------+-------+----------+-------------+--------------+-------+--------+----------+----------+
|Data Engineer|   4.5|     Sagence|   2009|Consulting|     New York|            NY|Chicago|      IL|     80000|    150000|
+-------------+------+------------+-------+----------+-------------+--------------+-------+--------+----------+----------+



# Reorder columns

In [64]:
df = df.select('job_title', 'salary_min', 'salary_max', 'company_name', 'rating', 'location_city', 'location_state', \
                      'hq_city', 'hq_state', 'founded', 'industry')

In [65]:
df.printSchema()

root
 |-- job_title: string (nullable = true)
 |-- salary_min: integer (nullable = true)
 |-- salary_max: integer (nullable = true)
 |-- company_name: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- location_city: string (nullable = true)
 |-- location_state: string (nullable = true)
 |-- hq_city: string (nullable = true)
 |-- hq_state: string (nullable = true)
 |-- founded: integer (nullable = true)
 |-- industry: string (nullable = true)



# Check and remove Null values

In [66]:
df.count()

2528

In [67]:
df = df.filter('job_title IS NOT NULL AND \
                salary_min IS NOT NULL AND \
                salary_max IS NOT NULL AND \
                company_name IS NOT NULL AND \
                rating IS NOT NULL AND \
                location_city IS NOT NULL AND \
                location_state IS NOT NULL AND \
                hq_city IS NOT NULL AND \
                hq_state IS NOT NULL AND \
                founded IS NOT NULL AND \
                industry IS NOT NULL')

In [68]:
df.count()

2183

# Remove invalid values in columns (values are -1)

In [69]:
df = df.filter(df.job_title != '-1')
df = df.filter(df.salary_min != -1)
df = df.filter(df.salary_max != -1)
df = df.filter(df.company_name != '-1')
df = df.filter(df.rating != -1.0)
df = df.filter(df.location_city != '-1')
df = df.filter(df.location_state != '-1')
df = df.filter(df.hq_city != '-1')
df = df.filter(df.hq_state != '-1')
df = df.filter(df.founded != -1)
df = df.filter(df.industry != '-1')

In [70]:
df.count()

1713

In [71]:
df.limit(5).toPandas()

Unnamed: 0,job_title,salary_min,salary_max,company_name,rating,location_city,location_state,hq_city,hq_state,founded,industry
0,Data Engineer,80000,150000,Sagence,4.5,New York,NY,Chicago,IL,2009,Consulting
1,Senior Data Engineer (Healthcare Domain experi...,80000,150000,Enterprise Integration,3.4,New York,NY,Jacksonville,FL,1998,IT Services
2,Data Engineers,80000,150000,Maestro Technologies,5.0,New York,NY,Trenton,NJ,2003,IT Services
3,Client Trade Support Engineer,80000,150000,Jane Street,4.8,New York,NY,New York,NY,2000,Investment Banking & Asset Management
4,Operations Engineer,80000,150000,Oscar Health,3.7,New York,NY,New York,NY,2012,Insurance Agencies & Brokerages


# Data quality check

* Check Null values

In [72]:
columns_list = df.columns
print(columns_list)

['job_title', 'salary_min', 'salary_max', 'company_name', 'rating', 'location_city', 'location_state', 'hq_city', 'hq_state', 'founded', 'industry']


In [73]:
for column in columns_list:
    result = df.filter(column + ' is Null').count()
    if result == 0:
        print(f"{column} column has no Null value")
    else:
        print(f"{column} column has {result} Null value.")

job_title column has no Null value
salary_min column has no Null value
salary_max column has no Null value
company_name column has no Null value
rating column has no Null value
location_city column has no Null value
location_state column has no Null value
hq_city column has no Null value
hq_state column has no Null value
founded column has no Null value
industry column has no Null value


* Check invalid values (-1)

In [74]:
for column in columns_list:
    if column == 'salary_min':
        result = df.filter(f"{column} == -1").count()
        print(f"{column} column has {result} invalid value")
    if column == 'salary_max':
        result = df.filter(f"{column} == -1").count()
        print(f"{column} column has {result} invalid value")
    if column == 'rating':
        rresult = df.filter(f"{column} == -1.0").count()
        print(f"{column} column has {result} invalid value")
    if column == 'founded':
        result = df.filter(f"{column} == -1").count()
        print(f"{column} column has {result} invalid value")
    else:
        result = df.filter(f"{column} == '-1'").count()
        print(f"{column} column has {result} invalid value")

job_title column has 0 invalid value
salary_min column has 0 invalid value
salary_min column has 0 invalid value
salary_max column has 0 invalid value
salary_max column has 0 invalid value
company_name column has 0 invalid value
rating column has 0 invalid value
rating column has 0 invalid value
location_city column has 0 invalid value
location_state column has 0 invalid value
hq_city column has 0 invalid value
hq_state column has 0 invalid value
founded column has 0 invalid value
industry column has 0 invalid value


# Write spark datafram to Redshift cluster that we have created.

* Read Redshift informations form aws.cfg and store in variables.

In [75]:
config = configparser.ConfigParser()
config.read_file(open('aws.cfg'))

DWH_ENDPOINT = config.get('DWH', 'DWH_ENDPOINT')
DWH_PORT = config.get('DWH', 'DWH_PORT')
DWH_DATABASE = config.get('DWH', 'DWH_DATABASE')
DWH_DATABASE_USER = config.get('DWH', 'DWH_DATABASE_USER')
DWH_DATABASE_PASSWORD = config.get('DWH', 'DWH_DATABASE_PASSWORD')

* Give table name we want to create in Redshift cluster

In [76]:
TABLE = 'de_job'

* Write spark dataframe to Redshift cluster

In [77]:
df.write \
    .format("jdbc") \
    .option("url", "jdbc:redshift://" + DWH_ENDPOINT + ":" + DWH_PORT + "/" + DWH_DATABASE) \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("dbtable", TABLE) \
    .option("user", DWH_DATABASE_USER) \
    .option("password", DWH_DATABASE_PASSWORD)\
    .save(mode='overwrite')

# Checking data that we have loaded into Redshift cluster

In [78]:
df = spark.read.format("jdbc") \
                .option("url", "jdbc:redshift://" + DWH_ENDPOINT + ":" + DWH_PORT + "/" + DWH_DATABASE) \
                .option("driver", "com.amazon.redshift.jdbc42.Driver") \
                .option("dbtable", TABLE) \
                .option("user", DWH_DATABASE_USER) \
                .option("password", DWH_DATABASE_PASSWORD) \
                .load()

In [79]:
df.count()

1713

In [80]:
df.limit(5).toPandas()

Unnamed: 0,job_title,salary_min,salary_max,company_name,rating,location_city,location_state,hq_city,hq_state,founded,industry
0,Data Engineer,80000,150000,Sagence,4.5,New York,NY,Chicago,IL,2009,Consulting
1,Senior Data Engineer (Healthcare Domain experi...,80000,150000,Enterprise Integration,3.4,New York,NY,Jacksonville,FL,1998,IT Services
2,Data Engineers,80000,150000,Maestro Technologies,5.0,New York,NY,Trenton,NJ,2003,IT Services
3,Client Trade Support Engineer,80000,150000,Jane Street,4.8,New York,NY,New York,NY,2000,Investment Banking & Asset Management
4,Operations Engineer,80000,150000,Oscar Health,3.7,New York,NY,New York,NY,2012,Insurance Agencies & Brokerages
