In [1]:
from pyspark.sql import SparkSession
import findspark

findspark.init()
spark = SparkSession.builder.appName('challenge').config('spark.sql.legacy.timeParserPolicy','LEGACY').getOrCreate()

In [2]:
data_2 = spark.read.csv('./salary_data/data_2.csv', header=True, multiLine=True)

In [3]:
data_2.printSchema()

root
 |-- Timestamp: string (nullable = true)
 |-- Employment Type: string (nullable = true)
 |-- Company Name: string (nullable = true)
 |-- Company Size - # Employees: string (nullable = true)
 |-- Primary Location (Country): string (nullable = true)
 |-- Primary Location (City): string (nullable = true)
 |-- Industry in Company: string (nullable = true)
 |-- Public or Private Company: string (nullable = true)
 |-- Years Experience in Industry: string (nullable = true)
 |-- Years of Experience in Current Company  : string (nullable = true)
 |-- Job Title In Company: string (nullable = true)
 |-- Job Ladder: string (nullable = true)
 |-- Job Level: string (nullable = true)
 |-- Required Hours Per Week: string (nullable = true)
 |-- Actual Hours Per Week: string (nullable = true)
 |-- Highest Level of Formal Education Completed: string (nullable = true)
 |-- Total Base Salary in 2018 (in USD): string (nullable = true)
 |-- Total Bonus in 2018 (cumulative annual value in USD): string (n

In [4]:
from functools import reduce
better_names = ['timestamp','employment_type','company','num_employees','country','city','industry','public_or_private','years_experience_industry','years_experience_current_company','job_title','job_ladder','job_level','required_hours_per_week','actual_hours_per_week','education_level','total_base_salary_2018','total_bonus_2018','total_equity_2018','health_insurance_offered','annual_vacation_in_weeks','happiness','plan_to_resign','direction_opinion','gender','top_skills','bootcamp']
data_2 = reduce(lambda df, params: df.withColumnRenamed(*params), zip(data_2.columns, better_names), data_2)

In [5]:
data_2.createOrReplaceTempView('data_2')
data_2.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- company: string (nullable = true)
 |-- num_employees: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- public_or_private: string (nullable = true)
 |-- years_experience_industry: string (nullable = true)
 |-- years_experience_current_company: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- job_ladder: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- required_hours_per_week: string (nullable = true)
 |-- actual_hours_per_week: string (nullable = true)
 |-- education_level: string (nullable = true)
 |-- total_base_salary_2018: string (nullable = true)
 |-- total_bonus_2018: string (nullable = true)
 |-- total_equity_2018: string (nullable = true)
 |-- health_insurance_offered: string (nullable = true)
 |-- annual_vacation_in_weeks: string (nullable = tr

In [6]:
spark.sql('''
    SELECT to_timestamp(timestamp, 'M/d/yyyy H:mm:ss') timestamp
    FROM data_2
''').show(truncate=False)

+-------------------+
|timestamp          |
+-------------------+
|null               |
|2019-09-11 00:47:40|
|2019-09-11 00:51:26|
|2019-09-11 00:51:56|
|2019-09-11 00:52:28|
|2019-09-11 00:54:02|
|2019-09-11 00:54:26|
|2019-09-11 00:55:34|
|2019-09-11 00:55:57|
|2019-09-11 00:56:14|
|2019-09-11 00:56:35|
|2019-09-11 00:56:42|
|2019-09-11 00:57:09|
|2019-09-11 00:57:11|
|2019-09-11 00:57:30|
|2019-09-11 00:58:26|
|2019-09-11 00:59:05|
|2019-09-11 00:59:14|
|2019-09-11 01:00:11|
|2019-09-11 01:00:28|
+-------------------+
only showing top 20 rows



In [7]:
spark.sql('''
    SELECT regexp_replace(total_base_salary_2018, '[.]', '') || CASE length(regexp_extract(total_base_salary_2018, '[0-9]+[.]([0-9]+)'))
                WHEN 0 THEN '00'
                WHEN 1 THEN '0'
                WHEN 2 THEN ''
                WHEN 3 THEN '00'
           END AS salary
    FROM data_2
''').show(1000000, truncate=False)

+--------------+
|salary        |
+--------------+
|null          |
|18000000      |
|15100000      |
|6300000       |
|10500000      |
|4576000       |
|6300000       |
|6000000       |
|10500000      |
|6180000       |
|6665000       |
|4800000       |
|11000000      |
|2800000       |
|7650000       |
|6000000       |
|3800000       |
|2800000       |
|5500000       |
|6500000       |
|10700000      |
|7500000       |
|1900000       |
|4200000       |
|6000000       |
|9000000       |
|7600000       |
|5100000       |
|6675000       |
|5150000       |
|10000000      |
|480000        |
|5200000       |
|10300000      |
|4800000       |
|5500000       |
|5000000       |
|9000000       |
|9500000       |
|4000000       |
|21500000      |
|11500000      |
|8000000       |
|7952700       |
|7500000       |
|11800000      |
|4210000       |
|10200000      |
|3300000       |
|6000000       |
|7900000       |
|400000        |
|5000000       |
|9100000       |
|12500000      |
|10400000     

In [8]:
spark.sql('''
    SELECT total_base_salary_2018, country
    FROM data_2
    WHERE length(regexp_extract(total_base_salary_2018, '[0-9]+[.]([0-9]+)')) = 3
''').show(1000000, truncate=False)

+----------------------+----------------+
|total_base_salary_2018|country         |
+----------------------+----------------+
|26.514                |Germany (DE)    |
|47.415                |Germany (DE)    |
|22.944                |Netherlands (NL)|
|22.889                |Italy (IT)      |
|65.999                |Italy (IT)      |
|28.622                |Spain (ES)      |
|66.924                |Germany (DE)    |
+----------------------+----------------+



In [9]:
spark.sql('''
    SELECT distinct total_bonus_2018
    FROM data_2
''').show(1000000, truncate=False)

+----------------+
|total_bonus_2018|
+----------------+
|800             |
|28500           |
|14000           |
|3200            |
|1361            |
|53000           |
|1500            |
|18750           |
|29553           |
|150000          |
|2200            |
|3250            |
|15              |
|700             |
|87000           |
|14243.07        |
|12420           |
|21000           |
|200             |
|1236            |
|11000           |
|110000          |
|219000          |
|26000           |
|1100            |
|11500           |
|5000            |
|3               |
|75000           |
|8500            |
|82000           |
|1300            |
|4200            |
|90000           |
|250             |
|120000          |
|2100            |
|352             |
|41600           |
|18600           |
|8               |
|5575            |
|9919            |
|17676           |
|1277            |
|1482            |
|6700            |
|343             |
|22500           |
|6100       

In [10]:
spark.sql('''
    SELECT DISTINCT country, city
    FROM data_2
    ORDER BY 1
''').show(1000000, truncate=False)

+-----------------------------------------+-------------------------------------------------------------------------+
|country                                  |city                                                                     |
+-----------------------------------------+-------------------------------------------------------------------------+
|null                                     |null                                                                     |
|Angola (AO)                              |null                                                                     |
|Anguilla (AI)                            |Wb                                                                       |
|Argentina (AR)                           |Buenos Aires                                                             |
|Argentina (AR)                           |null                                                                     |
|Aruba (AW)                               |null         

In [11]:
spark.sql('''
    SELECT DISTINCT years_experience_industry
    FROM data_2
''').show(1000000, truncate=False)

+-------------------------+
|years_experience_industry|
+-------------------------+
|2-5                      |
|null                     |
|20+                      |
|0-2                      |
|5-10                     |
|10-20                    |
+-------------------------+



In [12]:
spark.sql('''
    SELECT initcap(job_title), count(1)
    FROM data_2
    GROUP BY 1
    ORDER BY 2 DESC
''').show(1000000, truncate=False)

+-------------------------------------------------------------+--------+
|initcap(job_title)                                           |count(1)|
+-------------------------------------------------------------+--------+
|Systems Administrator                                        |196     |
|Software Engineer                                            |166     |
|System Administrator                                         |133     |
|It Manager                                                   |105     |
|Systems Engineer                                             |105     |
|Network Engineer                                             |69      |
|Senior Software Engineer                                     |63      |
|Network Administrator                                        |52      |
|Devops Engineer                                              |46      |
|Software Developer                                           |45      |
|Sysadmin                                          

In [13]:
def normalize_adhoc_money_quantity_query_expression(src_column: str, dest_column: str) -> str:
    return f"""
        CAST(
            CAST(regexp_replace({src_column}, '[.]', '') || CASE length(regexp_extract({src_column}, '[0-9]+[.]([0-9]+)'))
                WHEN 0 THEN '00'
                WHEN 1 THEN '0'
                WHEN 2 THEN ''
                WHEN 3 THEN '00'
            END AS LONG) / 100
            AS DECIMAL(15,2)
        ) AS {dest_column}
    """

salary_expr = normalize_adhoc_money_quantity_query_expression('total_base_salary_2018', 'annual_salary')

bonus_expr = normalize_adhoc_money_quantity_query_expression('total_bonus_2018', 'annual_bonus')

timestamp_expr = "to_timestamp(timestamp, 'M/d/yyyy H:mm:ss') AS timestamp"

country_expr = "trim(country) AS country"

city_expr = "trim(city) AS city"

industry_expr = "trim(industry) AS industry"

years_experience_industry_expr = "trim(years_experience_industry) AS experience_range"

job_title_expr = "trim(job_title) AS job_title"

final_df = spark.sql(f'''
    SELECT {timestamp_expr},
           {salary_expr},
           {bonus_expr},
           {job_title_expr},
           {country_expr},
           {city_expr},
           {industry_expr},
           {years_experience_industry_expr}
    FROM data_2
''')

final_df.printSchema()

final_df.show()

root
 |-- timestamp: timestamp (nullable = true)
 |-- annual_salary: decimal(15,2) (nullable = true)
 |-- annual_bonus: decimal(15,2) (nullable = true)
 |-- job_title: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- experience_range: string (nullable = true)

+-------------------+-------------+------------+--------------------+-------------------+-----------+--------------------+----------------+
|          timestamp|annual_salary|annual_bonus|           job_title|            country|       city|            industry|experience_range|
+-------------------+-------------+------------+--------------------+-------------------+-----------+--------------------+----------------+
|               null|         null|        null|                null|               null|       null|                null|            null|
|2019-09-11 00:47:40|    180000.00|    40000.00|Senior Vice Presi...| United States

In [14]:
final_df.write.parquet('./optimized_data/data_2')