In [12]:
from pyspark.sql import SparkSession

AGE_MIDPOINT = "age_midpoint"
SALARY_MIDPOINT = "salary_midpoint"
SALARY_MIDPOINT_BUCKET = "salary_midpoint_bucket"


session = SparkSession.builder.appName("StackOverFlowSurvey").getOrCreate()
dataFrameReader = session.read
responses = dataFrameReader \
    .option("header", "true") \
    .option("inferSchema", value = True) \
    .csv("../input_data/2016-stack-overflow-survey-responses.csv")

print("================================= Print out schema ===")
responses.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- collector: string (nullable = true)
 |-- country: string (nullable = true)
 |-- un_subregion: string (nullable = true)
 |-- so_region: string (nullable = true)
 |-- age_range: string (nullable = true)
 |-- age_midpoint: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- self_identification: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- occupation_group: string (nullable = true)
 |-- experience_range: string (nullable = true)
 |-- experience_midpoint: double (nullable = true)
 |-- salary_range: string (nullable = true)
 |-- salary_midpoint: double (nullable = true)
 |-- big_mac_index: double (nullable = true)
 |-- tech_do: string (nullable = true)
 |-- tech_want: string (nullable = true)
 |-- aliens: string (nullable = true)
 |-- programming_ability: double (nullable = true)
 |-- employment_status: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- company_size_range: string (null

In [13]:
responseWithSelectedColumns = responses.select("country", "occupation", AGE_MIDPOINT, SALARY_MIDPOINT)

print("======================================= Print the selected columns of the table ===")
responseWithSelectedColumns.show()

+-----------+--------------------+------------+---------------+
|    country|          occupation|age_midpoint|salary_midpoint|
+-----------+--------------------+------------+---------------+
|Afghanistan|                null|        22.0|           null|
|Afghanistan|Mobile developer ...|        32.0|        45000.0|
|Afghanistan|                null|        null|           null|
|Afghanistan|              DevOps|        null|         5000.0|
|Afghanistan|                null|        65.0|           null|
|Afghanistan|                null|        22.0|           null|
|Afghanistan|       Growth hacker|        null|       210000.0|
|Afghanistan|Back-end web deve...|        27.0|         5000.0|
|    Albania|                null|        27.0|           null|
|    Albania|Back-end web deve...|        22.0|         5000.0|
|    Albania|Full-stack web de...|        27.0|         5000.0|
|    Albania|Full-stack web de...|        22.0|        15000.0|
|    Albania|Full-stack web de...|      

In [4]:
print("=== Print records where the response is from Afghanistan ===")
responseWithSelectedColumns
.filter(responseWithSelectedColumns["country"] == "Afghanistan")
.show()

=== Print records where the response is from Afghanistan ===
+-----------+--------------------+------------+---------------+
|    country|          occupation|age_midpoint|salary_midpoint|
+-----------+--------------------+------------+---------------+
|Afghanistan|                null|        22.0|           null|
|Afghanistan|Mobile developer ...|        32.0|        45000.0|
|Afghanistan|                null|        null|           null|
|Afghanistan|              DevOps|        null|         5000.0|
|Afghanistan|                null|        65.0|           null|
|Afghanistan|                null|        22.0|           null|
|Afghanistan|       Growth hacker|        null|       210000.0|
|Afghanistan|Back-end web deve...|        27.0|         5000.0|
+-----------+--------------------+------------+---------------+



In [5]:
print("=== Print the count of occupations ===")
groupedData = responseWithSelectedColumns.groupBy("occupation")
groupedData.count().show()

=== Print the count of occupations ===
+--------------------+-----+
|          occupation|count|
+--------------------+-----+
|     Product manager|   18|
|Business intellig...|    8|
|Mobile developer ...|    3|
|System administrator|   34|
|             Student|  234|
|    Mobile developer|   60|
| Engineering manager|   22|
|                null|  297|
|            Designer|   15|
|Embedded applicat...|   33|
| Graphics programmer|   11|
|               other|   67|
|   Desktop developer|   87|
|Developer with a ...|   25|
|       Growth hacker|    5|
|   Quality Assurance|   10|
|             Analyst|   20|
|Full-stack web de...|  498|
|Mobile developer ...|   32|
|Machine learning ...|   10|
+--------------------+-----+
only showing top 20 rows



In [15]:
print("=== Print records with average mid age less than 20 ===")
responseWithSelectedColumns.filter(responseWithSelectedColumns[AGE_MIDPOINT] < 20).show()

=== Print records with average mid age less than 20 ===
+---------+--------------------+------------+---------------+
|  country|          occupation|age_midpoint|salary_midpoint|
+---------+--------------------+------------+---------------+
|  Algeria|             Student|        16.0|           null|
|  Algeria|Back-end web deve...|        16.0|           null|
|Argentina|             Student|        16.0|         5000.0|
|Argentina|Back-end web deve...|        16.0|         5000.0|
|  Armenia|Back-end web deve...|        16.0|         5000.0|
|  Armenia|                null|        16.0|           null|
|  Armenia|Mobile developer ...|        16.0|         5000.0|
|  Armenia|Mobile developer ...|        16.0|         5000.0|
|  Austria|Mobile developer ...|        16.0|           null|
|  Austria|Full-stack web de...|        16.0|           null|
|  Austria|Full-stack web de...|        16.0|        15000.0|
|  Austria|                null|        16.0|           null|
|  Austria|   

In [7]:
print("=== Print the result by salary middle point in descending order ===")
responseWithSelectedColumns\
    .orderBy(responseWithSelectedColumns[SALARY_MIDPOINT], ascending = False).show()

=== Print the result by salary middle point in descending order ===
+------------------+--------------------+------------+---------------+
|           country|          occupation|age_midpoint|salary_midpoint|
+------------------+--------------------+------------+---------------+
|         Argentina|Back-end web deve...|        32.0|       210000.0|
|           Denmark|              DevOps|        44.5|       210000.0|
|         Argentina|Full-stack web de...|        27.0|       210000.0|
|           Denmark|Enterprise level ...|        32.0|       210000.0|
|Dominican Republic|Executive (VP of ...|        37.0|       210000.0|
|             China|Machine learning ...|        22.0|       210000.0|
|            France|Full-stack web de...|        32.0|       210000.0|
|           Denmark|Full-stack web de...|        22.0|       210000.0|
|       Afghanistan|       Growth hacker|        null|       210000.0|
|          Bulgaria|Enterprise level ...|        37.0|       195000.0|
|        

In [8]:
print("=== Group by country and aggregate by average salary middle point ===")
dataGroupByCountry = responseWithSelectedColumns.groupBy("country")
dataGroupByCountry.avg(SALARY_MIDPOINT).show()

=== Group by country and aggregate by average salary middle point ===
+------------------+--------------------+
|           country|avg(salary_midpoint)|
+------------------+--------------------+
|           Germany|  46491.228070175435|
|       Afghanistan|             66250.0|
|          Cambodia|              5000.0|
|            France|  39648.760330578516|
|           Algeria|             30000.0|
|         Argentina|  27950.819672131147|
|           Belgium|   45989.01098901099|
|           Ecuador|             40000.0|
|           Albania|   8333.333333333334|
|           Finland|   45714.28571428572|
|           Bahamas|             95000.0|
|             China|             54687.5|
|           Belarus|             10000.0|
|             Chile|  41666.666666666664|
|           Croatia|  14166.666666666666|
|           Andorra|             40000.0|
|           Bolivia|              5000.0|
|           Denmark|   68768.65671641791|
|        Bangladesh|   7307.692307692308|
|Bosni

In [9]:
responseWithSalaryBucket = responses.withColumn(SALARY_MIDPOINT_BUCKET,
    ((responses[SALARY_MIDPOINT]/20000).cast("integer")*20000))

print("=== With salary bucket column ===")
responseWithSalaryBucket.select(SALARY_MIDPOINT, SALARY_MIDPOINT_BUCKET).show()

=== With salary bucket column ===
+---------------+----------------------+
|salary_midpoint|salary_midpoint_bucket|
+---------------+----------------------+
|           null|                  null|
|        45000.0|                 40000|
|           null|                  null|
|         5000.0|                     0|
|           null|                  null|
|           null|                  null|
|       210000.0|                200000|
|         5000.0|                     0|
|           null|                  null|
|         5000.0|                     0|
|         5000.0|                     0|
|        15000.0|                     0|
|         5000.0|                     0|
|         5000.0|                     0|
|        15000.0|                     0|
|           null|                  null|
|           null|                  null|
|           null|                  null|
|           null|                  null|
|        15000.0|                     0|
+---------------+------

In [10]:
print("=== Group by salary bucket ===")
responseWithSalaryBucket \
    .groupBy(SALARY_MIDPOINT_BUCKET) \
    .count() \
    .orderBy(SALARY_MIDPOINT_BUCKET) \
    .show()

=== Group by salary bucket ===
+----------------------+-----+
|salary_midpoint_bucket|count|
+----------------------+-----+
|                  null|  566|
|                     0|  523|
|                 20000|  351|
|                 40000|  260|
|                 60000|  134|
|                 80000|   63|
|                100000|   51|
|                120000|   23|
|                140000|   11|
|                160000|    5|
|                180000|    3|
|                200000|    9|
+----------------------+-----+



In [11]:
session.stop()