In [1]:
import os
import re

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

spark = (SparkSession.builder
         .master("local")
         .appName("ame-challenge")
         .getOrCreate())

# set log level
spark.sparkContext.setLogLevel("ERROR")

repository_path = os.getcwd()
path_landing = repository_path + '/pipeline/landing'
path_bronze = repository_path + '/pipeline/bronze'
path_silver = repository_path + '/pipeline/silver'
path_gold = repository_path + '/pipeline/gold'

23/09/04 13:25:04 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.0.174 instead (on interface wlp7s0)
23/09/04 13:25:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/09/04 13:25:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Bronze

In [2]:
# get file(s) in landing
df_zipped = spark \
    .read \
    .format("csv") \
    .option("compression", "gzip") \
    .option("delimiter", ",") \
    .option("header", True) \
    .load(f"{path_landing}/base_de_respostas_10k_amostra_csv.gz")

# get auto schema to csv file
df_static_sch = df_zipped.schema

# configure batch for [landing]
df_ss_bronze = spark.read \
    .schema(df_static_sch) \
    .format("csv") \
    .option("compression", "gzip") \
    .option("sep", ",") \
    .option("header", "true") \
    .load(f'{path_landing}/base_de_respostas_*.gz') \
    .withColumn("loading_date_stage", current_timestamp())

# BRONZE
# storage data in bronze zone in csv format
df_ss_bronze.write \
    .mode("append") \
    .option('header', 'true') \
    .csv(f"{path_bronze}/base_de_respostas.csv")

                                                                                

#### Silver

In [3]:
df_read_bronze = spark.read \
    .format("csv") \
    .option("compression", "gzip") \
    .option("sep", ",") \
    .option("header", "true") \
    .option("maxFilesPerTrigger", "1") \
    .load(f'{path_bronze}/base_de_respostas.csv') 

new_columns = [re.sub('([a-z0-9])([A-Z])', r'\1_\2', new_column).lower() for new_column in df_read_bronze.columns]

df_read_bronze = df_read_bronze.toDF(*new_columns)

df_transform = df_read_bronze.fillna(
    {'company_size': 'Others', 'operating_system': 'Others', \
    'communication_tools': 'Others', 'language_worked_with': 'Others', \
    'converted_salary': 0}
    )

df_transform.createOrReplaceTempView("vw_transform")

df_clean_country = spark.sql(f"""
    SELECT * 
    FROM vw_transform
    where country IS NOT NULL
""")

df_clean_country.createOrReplaceTempView("vw_silver_base_respostas")

# BRONZE to SILVER
# set typing for fields in [base_resposta]
df_ss_silver_base_respostas = (
    spark.sql(f"""
        SELECT
            CAST(respondent AS INTEGER) AS respondent,
            CAST(concat('respondent_',respondent) AS STRING) AS nome,
            CAST(hobby AS STRING) AS hobby,
            CAST(hobby AS STRING) AS programa_hobby,
            CAST(open_source AS STRING) AS open_source,
            CAST(open_source AS STRING) AS contrib_open_source,
            CAST(country AS STRING) AS country,
            CAST(student AS STRING) AS student,
            CAST(employment AS STRING) AS employment,
            CAST(formal_education AS STRING) AS formal_education,
            CAST(undergrad_major AS STRING) AS undergrad_major,
            CAST(company_size AS STRING) AS company_size,
            CAST(dev_type AS STRING) AS dev_type,
            CAST(years_coding AS STRING) AS years_coding,
            CAST(years_coding_prof AS STRING) AS years_coding_prof,
            CAST(job_satisfaction AS STRING) AS job_satisfaction,
            CAST(career_satisfaction AS STRING) AS career_satisfaction,
            CAST(hope_five_years AS STRING) AS hope_five_years,
            CAST(job_search_status AS STRING) AS job_search_status,
            CAST(last_new_job AS STRING) AS last_new_job,
            CAST(assess_job1 AS DOUBLE) AS assess_job1,
            CAST(assess_job2 AS DOUBLE) AS assess_job2,
            CAST(assess_job3 AS DOUBLE) AS assess_job3,
            CAST(assess_job4 AS DOUBLE) AS assess_job4,
            CAST(assess_job5 AS DOUBLE) AS assess_job5,
            CAST(assess_job6 AS DOUBLE) AS assess_job6,
            CAST(assess_job7 AS DOUBLE) AS assess_job7,
            CAST(assess_job8 AS DOUBLE) AS assess_job8,
            CAST(assess_job9 AS DOUBLE) AS assess_job9,
            CAST(assess_job10 AS DOUBLE) AS assess_job10,
            CAST(assess_benefits1 AS DOUBLE) AS assess_benefits1,
            CAST(assess_benefits2 AS DOUBLE) AS assess_benefits2,
            CAST(assess_benefits3 AS DOUBLE) AS assess_benefits3,
            CAST(assess_benefits4 AS DOUBLE) AS assess_benefits4,
            CAST(assess_benefits5 AS DOUBLE) AS assess_benefits5,
            CAST(assess_benefits6 AS DOUBLE) AS assess_benefits6,
            CAST(assess_benefits7 AS DOUBLE) AS assess_benefits7,
            CAST(assess_benefits8 AS DOUBLE) AS assess_benefits8,
            CAST(assess_benefits9 AS DOUBLE) AS assess_benefits9,
            CAST(assess_benefits10 AS DOUBLE) AS assess_benefits10,
            CAST(assess_benefits11 AS DOUBLE) AS assess_benefits11,
            CAST(job_contact_priorities1 AS DOUBLE) AS job_contact_priorities1,
            CAST(job_contact_priorities2 AS DOUBLE) AS job_contact_priorities2,
            CAST(job_contact_priorities3 AS DOUBLE) AS job_contact_priorities3,
            CAST(job_contact_priorities4 AS DOUBLE) AS job_contact_priorities4,
            CAST(job_contact_priorities5 AS DOUBLE) AS job_contact_priorities5,
            CAST(job_email_priorities1 AS DOUBLE) AS job_email_priorities1,
            CAST(job_email_priorities2 AS DOUBLE) AS job_email_priorities2,
            CAST(job_email_priorities3 AS DOUBLE) AS job_email_priorities3,
            CAST(job_email_priorities4 AS DOUBLE) AS job_email_priorities4,
            CAST(job_email_priorities5 AS DOUBLE) AS job_email_priorities5,
            CAST(job_email_priorities6 AS DOUBLE) AS job_email_priorities6,
            CAST(job_email_priorities7 AS DOUBLE) AS job_email_priorities7,
            CAST(update_cv AS STRING) AS update_cv,
            CAST(currency AS STRING) AS currency,
            CAST(salary AS INTEGER) AS salary,
            CAST(salary_type AS STRING) AS salary_type,
            CAST(converted_salary AS DOUBLE) AS converted_salary,
            CAST(round(converted_salary*3.81, 2) AS DOUBLE) as salario_anual,
            CAST(round(converted_salary*3.81/12, 2) AS DOUBLE) as salario,
            CAST(currency_symbol AS STRING) AS currency_symbol,
            CAST(communication_tools AS STRING) AS communication_tools,
            CAST(time_fully_productive AS STRING) AS time_fully_productive,
            CAST(education_types AS STRING) AS education_types,
            CAST(self_taught_types AS STRING) AS self_taught_types,
            CAST(time_after_bootcamp AS STRING) AS time_after_bootcamp,
            CAST(hackathon_reasons AS STRING) AS hackathon_reasons,
            CAST(agree_disagree1 AS STRING) AS agree_disagree1,
            CAST(agree_disagree2 AS STRING) AS agree_disagree2,
            CAST(agree_disagree3 AS STRING) AS agree_disagree3,
            CAST(language_worked_with AS STRING) AS language_worked_with,
            CAST(language_desire_next_year AS STRING) AS language_desire_next_year,
            CAST(database_worked_with AS STRING) AS database_worked_with,
            CAST(database_desire_next_year AS STRING) AS database_desire_next_year,
            CAST(platform_worked_with AS STRING) AS platform_worked_with,
            CAST(platform_desire_next_year AS STRING) AS platform_desire_next_year,
            CAST(framework_worked_with AS STRING) AS framework_worked_with,
            CAST(framework_desire_next_year AS STRING) AS framework_desire_next_year,
            CAST(ide AS STRING) AS ide,
            CAST(operating_system AS STRING) AS operating_system,
            CAST(number_monitors AS INTEGER) AS number_monitors,
            CAST(methodology AS STRING) AS methodology,
            CAST(version_control AS STRING) AS version_control,
            CAST(check_in_code AS STRING) AS check_in_code,
            CAST(ad_blocker AS STRING) AS ad_blocker,
            CAST(ad_blocker_disable AS STRING) AS ad_blocker_disable,
            CAST(ad_blocker_reasons AS STRING) AS ad_blocker_reasons,
            CAST(ads_agree_disagree1 AS STRING) AS ads_agree_disagree1,
            CAST(ads_agree_disagree2 AS STRING) AS ads_agree_disagree2,
            CAST(ads_agree_disagree3 AS STRING) AS ads_agree_disagree3,
            CAST(ads_actions AS STRING) AS ads_actions,
            CAST(ads_priorities1 AS DOUBLE) AS ads_priorities1,
            CAST(ads_priorities2 AS DOUBLE) AS ads_priorities2,
            CAST(ads_priorities3 AS DOUBLE) AS ads_priorities3,
            CAST(ads_priorities4 AS DOUBLE) AS ads_priorities4,
            CAST(ads_priorities5 AS DOUBLE) AS ads_priorities5,
            CAST(ads_priorities6 AS DOUBLE) AS ads_priorities6,
            CAST(ads_priorities7 AS DOUBLE) AS ads_priorities7,
            CAST(aidangerous AS STRING) AS aidangerous,
            CAST(aiinteresting AS STRING) AS aiinteresting,
            CAST(airesponsible AS STRING) AS airesponsible,
            CAST(aifuture AS STRING) AS aifuture,
            CAST(ethics_choice AS STRING) AS ethics_choice,
            CAST(ethics_report AS STRING) AS ethics_report,
            CAST(ethics_responsible AS STRING) AS ethics_responsible,
            CAST(ethical_implications AS STRING) AS ethical_implications,
            CAST(stack_overflow_recommend AS INTEGER) AS stack_overflow_recommend,
            CAST(stack_overflow_visit AS STRING) AS stack_overflow_visit,
            CAST(stack_overflow_has_account AS STRING) AS stack_overflow_has_account,
            CAST(stack_overflow_participate AS STRING) AS stack_overflow_participate,
            CAST(stack_overflow_jobs AS STRING) AS stack_overflow_jobs,
            CAST(stack_overflow_dev_story AS STRING) AS stack_overflow_dev_story,
            CAST(stack_overflow_jobs_recommend AS INTEGER) AS stack_overflow_jobs_recommend,
            CAST(stack_overflow_consider_member AS STRING) AS stack_overflow_consider_member,
            CAST(hypothetical_tools1 AS STRING) AS hypothetical_tools1,
            CAST(hypothetical_tools2 AS STRING) AS hypothetical_tools2,
            CAST(hypothetical_tools3 AS STRING) AS hypothetical_tools3,
            CAST(hypothetical_tools4 AS STRING) AS hypothetical_tools4,
            CAST(hypothetical_tools5 AS STRING) AS hypothetical_tools5,
            CAST(wake_time AS STRING) AS wake_time,
            CAST(hours_computer AS STRING) AS hours_computer,
            CAST(hours_outside AS STRING) AS hours_outside,
            CAST(skip_meals AS STRING) AS skip_meals,
            CAST(ergonomic_devices AS STRING) AS ergonomic_devices,
            CAST(exercise AS STRING) AS exercise,
            CAST(gender AS STRING) AS gender,
            CAST(sexual_orientation AS STRING) AS sexual_orientation,
            CAST(education_parents AS STRING) AS education_parents,
            CAST(race_ethnicity AS STRING) AS race_ethnicity,
            CAST(age AS STRING) AS age,
            CAST(dependents AS STRING) AS dependents,
            CAST(military_us AS STRING) AS military_us,
            CAST(survey_too_long AS STRING) AS survey_too_long,
            CAST(survey_easy AS STRING) AS survey_easy,
            CAST(loading_date_stage AS TIMESTAMP) AS loading_date_stage
        FROM
            (
            SELECT DENSE_RANK() OVER(ORDER BY loading_date_stage DESC) AS rank, *
            FROM vw_silver_base_respostas
            ) AS T
        WHERE
            T.rank = 1
            """)
)
# storage data in silver zone in csv format
(
    df_ss_silver_base_respostas
    .write \
    .mode("overwrite") \
    .option('header', 'true') \
    .csv(f"{path_silver}/base_de_respostas.csv")
)

                                                                                

#### Gold

In [4]:
df_read_silver = (
    spark.read.format("csv")
    .option("sep", ",")
    .option("header", "true")
    .option("maxFilesPerTrigger", "1")
    .load(f"{path_silver}/base_de_respostas.csv")
)

df_read_silver.createOrReplaceTempView("vw_base_silver")

# write dataframe [empresa] in gold zone
df_empresa = spark.sql("""
    select ROW_NUMBER() OVER (ORDER BY (SELECT 1)) - 1 as id, company_size AS tamanho
    from 
    (
        select distinct company_size
        from vw_base_silver
        order by company_size asc
    )
""")

(
    df_empresa
    .write \
    .mode("overwrite") \
    .option('header', 'true') \
    .csv(f"{path_gold}/empresa.csv")
)

df_empresa.createOrReplaceTempView('vw_empresa')

# write dataframe [pais] in gold zone
df_pais = spark.sql("""
    select ROW_NUMBER() OVER (ORDER BY (SELECT 1)) - 1 as id, country AS nome
    from 
    (
        select distinct country
        from vw_base_silver
        order by country asc    
    )
""")

(
    df_pais
    .write \
    .mode("overwrite") \
    .option('header', 'true') \
    .csv(f"{path_gold}/pais.csv")
)

df_pais.createOrReplaceTempView('vw_pais')

# write dataframe [sistema_operacional] in gold zone
df_sistema_operacional = spark.sql(
    """
    select ROW_NUMBER() OVER (ORDER BY (SELECT 1)) - 1 as id, operating_system AS nome
    from 
    (
        select distinct operating_system
        from vw_base_silver
        order by operating_system asc
    );                                   
""")

(
    df_sistema_operacional
    .write \
    .mode("overwrite") \
    .option('header', 'true') \
    .csv(f"{path_gold}/sistema_operacional.csv")
)

df_sistema_operacional.createOrReplaceTempView('vw_sistema_operacional')

# write dataframe [ferramenta_comunic] in gold zone
df_ferramenta_comunic = spark.sql("""
    select ROW_NUMBER() OVER (ORDER BY (SELECT 1)) - 1 as id, nome
    from 
    (
        SELECT DISTINCT trim(split_communication_tools) as nome
        FROM vw_base_silver
        LATERAL VIEW explode(split(communication_tools, ';')) AS split_communication_tools
    );                                   
""")

(
    df_ferramenta_comunic
    .write \
    .mode("overwrite") \
    .option('header', 'true') \
    .csv(f"{path_gold}/ferramenta_comunic.csv")
)

df_ferramenta_comunic.createOrReplaceTempView("vw_ferramenta_comunic")

# write dataframe [linguagem_programacao] in gold zone
df_linguagem_programacao = spark.sql("""
    select ROW_NUMBER() OVER (ORDER BY (SELECT 1)) - 1 as id, nome
    from 
    (
        SELECT DISTINCT trim(split_language_worked_with) as nome
        FROM vw_base_silver
        LATERAL VIEW explode(split(language_worked_with, ';')) AS split_language_worked_with
    );                                   
""")

(
    df_linguagem_programacao
    .write \
    .mode("overwrite") \
    .option('header', 'true') \
    .csv(f"{path_gold}/linguagem_programacao.csv")
)

df_linguagem_programacao.createOrReplaceTempView("vw_linguagem_programacao")

# write dataframe [resp_usa_ferramenta] in gold zone
df_resp_usa_ferramenta = spark.sql("""
    with respondent_communication_tools as 
    (
        select respondent, split_communication_tools
        from 
        (
            SELECT *
            FROM vw_base_silver
            LATERAL VIEW explode(split(communication_tools, ';')) AS split_communication_tools
        )
    )
    select id as ferramenta_comunic_id, respondent as respondent_id
    from vw_ferramenta_comunic as vfc
    inner join respondent_communication_tools as rct
    on vfc.nome = rct.split_communication_tools;     
""")

(
    df_resp_usa_ferramenta
    .write \
    .mode("overwrite") \
    .option('header', 'true') \
    .csv(f"{path_gold}/resp_usa_ferramenta.csv")
)

# write dataframe [resp_usa_linguagem] in gold zone
df_resp_usa_linguagem = spark.sql("""
    with respondent_language AS 
    (
        select respondent, split_language_worked_with, years_coding
        from vw_base_silver
        LATERAL VIEW explode(split(language_worked_with, ';')) AS split_language_worked_with
    )
    select respondent as respondent_id, id as language_worked_with_id, years_coding as momento
    from respondent_language as rlg
    inner join vw_linguagem_programacao as vlp
    on rlg.split_language_worked_with = vlp.nome
""")

(
    df_resp_usa_linguagem
    .write \
    .mode("overwrite") \
    .option('header', 'true') \
    .csv(f"{path_gold}/resp_usa_linguagem.csv")
)

# write dataframe [respondent] in gold zone
df_respondente = spark.sql("""
    with tabela_fato as
    (
        select
        respondent as id, nome,
        contrib_open_source, programa_hobby, salario, 
        operating_system, country, company_size 
        from vw_base_silver
    )
    select 
        tbf.`id`, tbf.nome, contrib_open_source, programa_hobby, 
        salario, vso.`id` as sistema_operacional_id, 
        vpa.`id` as pais_id, vem.`id` as empresa_id
    from tabela_fato as tbf
    inner join vw_empresa vem
    on tbf.company_size = vem.tamanho
    inner join vw_pais as vpa
    on tbf.country = vpa.nome
    inner join vw_sistema_operacional as vso
    on vso.nome = tbf.operating_system
""")

(
    df_respondente
    .write \
    .mode("overwrite") \
    .option('header', 'true') \
    .csv(f"{path_gold}/respondente.csv")
)

df_respondente.createOrReplaceTempView('vw_respondente')

df_respondente.toPandas().head()

                                                                                

Unnamed: 0,id,nome,contrib_open_source,programa_hobby,salario,sistema_operacional_id,pais_id,empresa_id
0,101346,respondent_101346,No,No,15875.0,4,130,3
1,44791,respondent_44791,No,Yes,25257.76,4,42,0
2,32306,respondent_32306,Yes,Yes,39687.5,4,130,2
3,37142,respondent_37142,No,Yes,0.0,2,130,1
4,21745,respondent_21745,Yes,Yes,0.0,4,81,4


#### Data Warehouse

In [5]:
# create database [ame_digital]
spark.sql("DROP DATABASE IF EXISTS ame_digital CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS ame_digital")
spark.sql("USE ame_digital")

# create table [empresa]
spark.sql("""
    drop table if exists empresa;
""")
spark.sql(f"""
    create table if not exists empresa
    (
        `id`            integer,
        tamanho         string
    )
    USING CSV
    OPTIONS(
        header="true",
        delimiter=",",
        inferSchema="true",
        path="{path_gold}/empresa.csv"
    );       
""")

# create table [pais]
spark.sql(f"""
    drop table if exists pais;
""")
spark.sql(f"""
    create table if not exists pais
    (
        `id`            integer,
        nome            string
    )
    USING CSV
    OPTIONS(
        header="true",
        delimiter=",",
        inferSchema="true",
        path="{path_gold}/pais.csv"
    );       
""")

# create table [sistema_operacional]
spark.sql("""
    drop table if exists sistema_operacional;
""")
spark.sql(f"""
    create table if not exists sistema_operacional
    (
        `id`            integer,
        nome            string
    )
    USING CSV
    OPTIONS(
        header="true",
        delimiter=",",
        inferSchema="true",
        path="{path_gold}/sistema_operacional.csv"
    );       
""")

# create table [ferramenta_comunic]
spark.sql("""
    drop table if exists ferramenta_comunic;
""")
spark.sql(f"""
    create table if not exists ferramenta_comunic
    (
        `id`            integer,
        nome            string
    )
    USING CSV
    OPTIONS(
        header="true",
        delimiter=",",
        inferSchema="true",
        path="{path_gold}/ferramenta_comunic.csv"
    );       
""")

# create table [linguagem_programacao]
spark.sql("""
    drop table if exists linguagem_programacao;
""")
spark.sql(f"""
    create table if not exists linguagem_programacao
    (
        `id`            integer,
        nome            string
    )
    USING CSV
    OPTIONS(
        header="true",
        delimiter=",",
        inferSchema="true",
        path="{path_gold}/linguagem_programacao.csv"
    );       
""")

# create table [resp_usa_ferramenta]
spark.sql("""
    drop table if exists resp_usa_ferramenta;
""")
spark.sql(f"""
    create table if not exists resp_usa_ferramenta
    (
        ferramenta_comunic_id   integer,
        respondente_id          integer
    )
    USING CSV
    OPTIONS(
        header="true",
        delimiter=",",
        inferSchema="true",
        path="{path_gold}/resp_usa_ferramenta.csv"
    );       
""")

# create table [resp_usa_linguagem]
spark.sql("""
    drop table if exists resp_usa_linguagem;
""")
spark.sql(f"""
    create table if not exists resp_usa_linguagem
    (
        respondente_id              integer,
        linguagem_programacao_id    integer,
        momento                     string
    )
    USING CSV
    OPTIONS(
        header="true",
        delimiter=",",
        inferSchema="true",
        path="{path_gold}/resp_usa_linguagem.csv"
    );       
""")

# create table [respondente]
spark.sql("""
    drop table if exists respondente;
""")
spark.sql(f"""
    create table if not exists respondente
    (
        `id`                    integer,
        nome                    string,
        contrib_open_source     string,
        programa_hobby          string,
        salario                 double,
        sistema_operacional_id  integer,
        pais_id                 integer,
        empresa_id              integer
    )
    USING CSV
    OPTIONS(
        header="true",
        delimiter=",",
        inferSchema="true",
        path="{path_gold}/respondente.csv"
    );       
""")


DataFrame[]

#### Análise dos dados - Data Warehouse

1. Qual a quantidade de respondentes de cada país?

In [9]:
spark.sql("""
    select pais.nome, count(*) as qtd_total
    from respondente as rpdt
    inner join pais 
    on rpdt.pais_id = pais.`id`
    group by pais.nome
    order by qtd_total desc;           
""").toPandas().head()

Unnamed: 0,nome,qtd_total
0,United States,2350
1,India,1124
2,United Kingdom,749
3,Germany,655
4,Canada,360


2. Quantos usuários que moram em "United States" gostam de Windows?

In [10]:
spark.sql("""
    with os_pais_usa as 
    (
        select pais.nome as pais, os.nome as sistema_operacional
        from respondente as rpdt
        inner join pais 
        on rpdt.pais_id = pais.`id`
        inner join sistema_operacional as os
        on rpdt.sistema_operacional_id = os.`id`
        where pais.nome == "United States"
    )
    select sistema_operacional, count(*) as total_qtd
    from os_pais_usa
    where sistema_operacional = "Windows"
    group by sistema_operacional
    order by total_qtd desc
""").toPandas().head()


Unnamed: 0,sistema_operacional,total_qtd
0,Windows,961


3. Qual a média de salário dos usuários que moram em Israel e gostam de Linux?

In [11]:
spark.sql("""
  with israel_salario_linux as 
  (
    select pais.nome as pais, os.nome as sistema_operacional, round(rpdt.salario, 2) as salario
    from respondente as rpdt
    inner join pais 
    on rpdt.pais_id = pais.`id`
    inner join sistema_operacional as os
    on rpdt.sistema_operacional_id = os.`id`
    where pais.nome == "Israel" and os.nome like "%Linux%"
  )
  select pais, sistema_operacional,
  round(avg(salario) over(), 2) as salario_medio
  from israel_salario_linux
  limit 1;
""").toPandas().head()

Unnamed: 0,pais,sistema_operacional,salario_medio
0,Israel,Linux-based,19278.15


4. Qual a média e o desvio padrão do salário dos usuários que usam Slack para cada tamanho de empresa disponível?

In [19]:
user_slack_company = spark.sql("""
with salario_empresa_slack as
(
  with empresa_ferramenta_id as
  (
    with salario_empresa as
    (
      select rpdt.`id`, rpdt.salario, emp.tamanho 
      from respondente as rpdt
      inner join empresa as emp 
      on rpdt.empresa_id = emp.`id`
      where rpdt.programa_hobby like 'No'
    )
    select `id`, salario, tamanho, ferramenta_comunic_id
    from salario_empresa as semp
    inner join resp_usa_ferramenta as ruf
    on semp.`id` = ruf.respondente_id
  )
  select efi.`id`, efi.salario, efi.tamanho, fco.nome
  from empresa_ferramenta_id as efi
  inner join ferramenta_comunic fco
  on efi.ferramenta_comunic_id = fco.`id`
  where fco.nome == "Slack"
)
select tamanho, round(avg(salario), 2) as media_salario, round(stddev(salario), 2) as desvio_p_salario
from salario_empresa_slack as ses
group by tamanho;
""")
user_slack_company.toPandas().head(10)

Unnamed: 0,tamanho,media_salario,desvio_p_salario
0,Fewer than 10 employees,20136.98,31788.78
1,100 to 499 employees,32889.28,56389.19
2,"5,000 to 9,999 employees",55232.78,135550.09
3,"1,000 to 4,999 employees",37202.95,64732.08
4,20 to 99 employees,32882.74,65515.58
5,500 to 999 employees,19301.02,16146.19
6,"10,000 or more employees",29963.25,58739.59
7,Others,30810.29,53100.91
8,10 to 19 employees,22170.6,46713.61


5. Qual a diferença entre a média de salário dos respondentes do Brasil que acham que criar código é um hobby e a média de todos de salário de todos os respondentes brasileiros agrupado por cada sistema operacional que eles usam?

In [20]:
spark.sql("""
    with media_geral_os as
    (
      select os.nome as sistema_operacional, round(avg(rpdt.salario), 2) as media_geral
      from respondente as rpdt
      inner join pais as pa
      on rpdt.pais_id = pa.`id`
      inner join sistema_operacional as os
      on rpdt.sistema_operacional_id = os.`id`
      where pa.nome == "Brazil"
      group by sistema_operacional
    ),
    media_hobby_os as 
    (
      select os.nome as sistema_operacional, round(avg(rpdt.salario), 2) as media_hobby
      from respondente as rpdt
      inner join pais as pa
      on rpdt.pais_id = pa.`id`
      inner join sistema_operacional as os
      on rpdt.sistema_operacional_id = os.`id`
      where pa.nome == "Brazil"
        and rpdt.programa_hobby == 'Yes'
      group by sistema_operacional
    )
    select mgo.sistema_operacional, mgo.media_geral, mho.media_hobby,
    round(abs(mgo.media_geral - mho.media_hobby), 2) as diff_media
    from media_geral_os as mgo
    inner join media_hobby_os as mho
    on mgo.sistema_operacional = mho.sistema_operacional
    order by mgo.sistema_operacional asc;
""").toPandas().head(10)


Unnamed: 0,sistema_operacional,media_geral,media_hobby,diff_media
0,BSD/Unix,264490.2,264490.2,0.0
1,Linux-based,10639.82,12079.34,1439.52
2,MacOS,8596.77,9320.15,723.38
3,Others,2779.93,3188.75,408.82
4,Windows,9366.6,10828.9,1462.3


6. Quais são as top 3 tecnologias mais usadas pelos desenvolvedores?

In [15]:
spark.sql("""
    with tecnologias_respondente as
    (
      with respondente_linguagem as 
      (
        select *
        from respondente as rpdt
        inner join resp_usa_linguagem as rul
        on rpdt.`id` = rul.respondente_id
      )
      select rl.`id`, lp.nome
      from respondente_linguagem as rl
      inner join linguagem_programacao as lp
      on rl.linguagem_programacao_id = lp.`id`
    )
    select nome as tecnologia, count(*) as total_qtd
    from tecnologias_respondente
    group by nome
    order by total_qtd desc
    limit 3;
""").toPandas().head()


Unnamed: 0,tecnologia,total_qtd
0,JavaScript,6286
1,HTML,6081
2,CSS,5810


7. Quais são os top 5 países em questão de salário?

In [16]:
spark.sql("""
    with salario_pais as
    (
      select pa.nome, rpdt.salario
      from respondente as rpdt
      inner join pais as pa
      on rpdt.pais_id = pa.`id`
      where rpdt.programa_hobby = 'No'
    ), 
    media_salario as
    (
        select nome as pais, round(avg(salario), 2) as salario_medio, count(salario) as qtd_empregados
        from salario_pais
        group by pais
        order by salario_medio desc
    )
    select pais, salario_medio
    from media_salario
    where qtd_empregados > 10
    order by salario_medio desc
    limit 5;
""").toPandas().head()

Unnamed: 0,pais,salario_medio
0,Ireland,95661.33
1,New Zealand,46951.73
2,Australia,38826.03
3,United States,37461.7
4,Canada,29266.62


8. A tabela abaixo contém os salários mínimos mensais de cinco países presentes na amostra de dados. Baseado nesses valores, gostaríamos de saber quantos usuários ganham mais de 5 salários mínimos em cada um desses países.

![fig](imagens/Picture6.png)

In [17]:
datas_salarios_minimos = [
    ('United States', 4787.90), ('India', 243.52), \
    ('United Kingdom', 6925.63), ('Germany', 6664.00), \
    ('Canada', 5567.68)
    ]
columns_salario = ['nome_pais', 'salario_minimo']
salarios_minimos = spark.createDataFrame(datas_salarios_minimos).toDF(*columns_salario)
salarios_minimos.createOrReplaceTempView('vw_salario_minimo')
salarios_minimos.show()

[Stage 205:>                                                        (0 + 1) / 1]

+--------------+--------------+
|     nome_pais|salario_minimo|
+--------------+--------------+
| United States|        4787.9|
|         India|        243.52|
|United Kingdom|       6925.63|
|       Germany|        6664.0|
|        Canada|       5567.68|
+--------------+--------------+



                                                                                

In [18]:
spark.sql("""
    with maior_cinco_salarios_minimos as 
    (
        with selecao_paises_salario as 
        (
        select rpdt.`id`, pa.nome as pais, salario
        from respondente as rpdt
        inner join pais as pa
        on rpdt.pais_id = pa.`id`
        where pa.nome = "United States"
            or pa.nome = "United Kingdom"
            or pa.nome ="India" 
            or pa.nome ="Germany" 
            or pa.nome ="Canada"
        )
        select *,
        5*salario_minimo as cinco_salarios_minimos
        from vw_salario_minimo as vsm
        inner join selecao_paises_salario sps
        on vsm.nome_pais = sps.pais
    )
    select pais, count(*) as total
    from maior_cinco_salarios_minimos
    where salario > cinco_salarios_minimos
    group by pais
    order by total desc;
""").toPandas().head()

Unnamed: 0,pais,total
0,United States,1385
1,India,496
2,United Kingdom,128
3,Canada,60
4,Germany,52
