In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [2]:
# Criação da sessão Spark
spark = SparkSession.builder \
    .appName("PySpark BigQuery Connection") \
    .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.23.2') \
    .config("spark.jars", "/usr/local/lib/spark-connectors/bigquery-connector-hadoop2-latest.jar") \
    .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true -Dio.netty.noUnsafe=true") \
    .config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true -Dio.netty.noUnsafe=true") \
    .getOrCreate()

24/09/30 17:37:46 WARN Utils: Your hostname, spark-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/09/30 17:37:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/spark/.ivy2/cache
The jars for the packages stored in: /home/spark/.ivy2/jars
com.google.cloud.spark#spark-bigquery-with-dependencies_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-aefdc8d1-e549-4a1a-8ae9-499a55bddbc4;1.0
	confs: [default]
	found com.google.cloud.spark#spark-bigquery-with-dependencies_2.12;0.23.2 in central
:: resolution report :: resolve 164ms :: artifacts dl 2ms
	:: modules in use:
	com.google.cloud.spark#spark-bigquery-with-dependencies_2.12;0.23.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	-------------------------------------------------------------------

In [3]:
spark.conf.set("viewsEnabled", True)
spark.conf.set("materializationDataset", "SOR")

In [4]:
sc = spark.sparkContext
sc.setLogLevel("INFO")
sc._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
sc._jsc.hadoopConfiguration().set('fs.gs.auth.service.account.json.keyfile', '/usr/local/lib/gcp/credentials/my-project-1508437523553-e9bafe7e3368.json')

In [5]:
# Função para salvar DataFrame em formato Parquet
def save_to_bigquery(df, dataset, table_name):
    # Salva o DataFrame em formato Parquet
    df.write \
    .format("bigquery") \
    .option("table", f"{dataset.upper()}.{table_name}") \
    .option("temporaryGcsBucket", "meu-bucket-temporario-spark") \
    .option("credentialsFile", "/usr/local/lib/gcp/credentials/my-project-1508437523553-e9bafe7e3368.json") \
    .mode("overwrite") \
    .save()

In [6]:
# Função para ler dados do BigQuery
def read_from_bigquery(dataset, table_name):  
    df = spark.read \
        .format('bigquery') \
        .option('table', f"{dataset.upper()}.{table_name}") \
        .option("credentialsFile", "/usr/local/lib/gcp/credentials/my-project-1508437523553-e9bafe7e3368.json") \
        .load()

    return df

In [7]:
df_dimensao = read_from_bigquery('SOR', 'tbx002_dimensao_geral')
df_dimensao.createOrReplaceTempView("tbx002_dimensao_geral")

In [8]:
df = read_from_bigquery('SOR', 'tbx001_data')

In [9]:
# Dicionário de mapeamento de renomeação de colunas
col_rename_map = {
    "UF": "uf",
    "Ano": "ano",
    "V1013": "mes",
    "V1012": "semana",
    "A001B3": "ano_nascimento",
    "A003": "sexo",
    "A004": "cor_raca",
    "V1023": "tipo_area",
    "A005": "escolaridade",
    "B002": "foi_posto_saude",
    "B0031": "ficou_em_casa",
    "B005": "ficou_internado",
    "B009B": "resultado_covid",
    "B009D" : "resultado_covid_2",
    "B009F": "resultado_covid_3",
    "B007": "tem_plano_saude",
    "C01011": "faixa_rendimento",
    "F001": "situacao_domicilio",
    "B0011": "teve_febre",
    "B0012": "teve_tosse",
    "B0013": "teve_dor_garganta",
    "B0014": "teve_dificuldade_respirar",
    "B0015": "teve_dor_cabeca",
    "B0016": "teve_dor_peito",
    "B0017": "teve_nausea",
    "B0018": "teve_nariz_entupido_escorrendo",
    "B0019": "teve_fadiga",
    "B00110": "teve_dor_olhos",
    "B00111": "teve_perda_olfato_sabor",
    "B00112": "teve_dor_muscular",
    "B00113": "teve_diarreia"

}



# Aplicar a renomeação das colunas e selecionar apenas as colunas renomeadas
df = df.select([F.col(old_name).alias(new_name) for old_name, new_name in col_rename_map.items()])


In [10]:
df.printSchema()

root
 |-- uf: string (nullable = true)
 |-- ano: string (nullable = true)
 |-- mes: string (nullable = true)
 |-- semana: string (nullable = true)
 |-- ano_nascimento: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- cor_raca: string (nullable = true)
 |-- tipo_area: string (nullable = true)
 |-- escolaridade: string (nullable = true)
 |-- foi_posto_saude: string (nullable = true)
 |-- ficou_em_casa: string (nullable = true)
 |-- ficou_internado: string (nullable = true)
 |-- resultado_covid: string (nullable = true)
 |-- resultado_covid_2: string (nullable = true)
 |-- resultado_covid_3: string (nullable = true)
 |-- tem_plano_saude: string (nullable = true)
 |-- faixa_rendimento: string (nullable = true)
 |-- situacao_domicilio: string (nullable = true)
 |-- teve_febre: string (nullable = true)
 |-- teve_tosse: string (nullable = true)
 |-- teve_dor_garganta: string (nullable = true)
 |-- teve_dificuldade_respirar: string (nullable = true)
 |-- teve_dor_cabeca: strin

In [11]:
df.createOrReplaceTempView("tbx001_data")

24/09/30 17:37:59 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [12]:
# realizar o join entre os dataframes, dimensionando a tabela de fatos
query = """
SELECT 
    TRIM(T2.categoria_descricao) AS estado,
    CAST(TRIM(T1.ano) AS INT) AS ano,
    CAST(TRIM(T1.mes) AS INT) AS mes,
    CAST(TRIM(T1.semana) AS INT) AS semana,
    CAST(TRIM(T1.ano_nascimento) AS INT) AS ano_nascimento,
    -- Definição da faixa etária com base na idade
    CASE
        WHEN (CAST(TRIM(T1.ano) AS INT) - CAST(TRIM(T1.ano_nascimento) AS INT)) BETWEEN 0 AND 17 THEN '0-17'
        WHEN (CAST(TRIM(T1.ano) AS INT) - CAST(TRIM(T1.ano_nascimento) AS INT)) BETWEEN 18 AND 29 THEN '18-29'
        WHEN (CAST(TRIM(T1.ano) AS INT) - CAST(TRIM(T1.ano_nascimento) AS INT)) BETWEEN 30 AND 39 THEN '30-39'
        WHEN (CAST(TRIM(T1.ano) AS INT) - CAST(TRIM(T1.ano_nascimento) AS INT)) BETWEEN 40 AND 49 THEN '40-49'
        WHEN (CAST(TRIM(T1.ano) AS INT) - CAST(TRIM(T1.ano_nascimento) AS INT)) BETWEEN 50 AND 59 THEN '50-59'
        WHEN (CAST(TRIM(T1.ano) AS INT) - CAST(TRIM(T1.ano_nascimento) AS INT)) BETWEEN 60 AND 69 THEN '60-69'
        WHEN (CAST(TRIM(T1.ano) AS INT) - CAST(TRIM(T1.ano_nascimento) AS INT)) BETWEEN 70 AND 79 THEN '70-79'
        WHEN (CAST(TRIM(T1.ano) AS INT) - CAST(TRIM(T1.ano_nascimento) AS INT)) BETWEEN 80 AND 89 THEN '80-89'
        WHEN (CAST(TRIM(T1.ano) AS INT) - CAST(TRIM(T1.ano_nascimento) AS INT)) BETWEEN 90 AND 99 THEN '90-99'
        ELSE '100+'
    END AS faixa_etaria,
    TRIM(T3.categoria_descricao) AS sexo,
    TRIM(T4.categoria_descricao) AS cor_raca,
    TRIM(T5.categoria_descricao) AS tipo_area,
    TRIM(T6.categoria_descricao) AS escolaridade,
    CASE
        WHEN T1.teve_febre = 1 THEN 'Sim'
        WHEN T1.teve_tosse = 1 THEN 'Sim'
        WHEN T1.teve_dor_garganta = 1 THEN 'Sim'
        WHEN T1.teve_dificuldade_respirar = 1 THEN 'Sim'
        WHEN T1.teve_dor_cabeca = 1 THEN 'Sim'
        WHEN T1.teve_dor_peito = 1 THEN 'Sim'
        WHEN T1.teve_nausea = 1 THEN 'Sim'
        WHEN T1.teve_nariz_entupido_escorrendo = 1 THEN 'Sim'
        WHEN T1.teve_fadiga = 1 THEN 'Sim'
        WHEN T1.teve_dor_olhos = 1 THEN 'Sim'
        WHEN T1.teve_perda_olfato_sabor = 1 THEN 'Sim'
        WHEN T1.teve_dor_muscular = 1 THEN 'Sim'
        WHEN T1.teve_diarreia = 1 THEN 'Sim'
        WHEN T1.teve_febre IS NULL
             AND T1.teve_tosse IS NULL
             AND T1.teve_dor_garganta IS NULL
             AND T1.teve_dificuldade_respirar IS NULL
             AND T1.teve_dor_cabeca IS NULL
             AND T1.teve_dor_peito IS NULL
             AND T1.teve_nausea IS NULL
             AND T1.teve_nariz_entupido_escorrendo IS NULL
             AND T1.teve_fadiga IS NULL
             AND T1.teve_dor_olhos IS NULL
             AND T1.teve_perda_olfato_sabor IS NULL
             AND T1.teve_dor_muscular IS NULL
             AND T1.teve_diarreia IS NULL 
        THEN NULL
        ELSE 'Não'
    END AS teve_sintomas_covid,
    TRIM(T7.categoria_descricao) AS foi_posto_saude,
    TRIM(T8.categoria_descricao) AS ficou_em_casa,
    TRIM(T9.categoria_descricao) AS ficou_internado,    
    CASE 
        when TRIM(T9.categoria_descricao) = 'Sim' or TRIM(T7.categoria_descricao) = 'Sim' then 'Sim'
        when TRIM(T7.categoria_descricao) is null and TRIM(T9.categoria_descricao) is null then null
        when TRIM(T9.categoria_descricao) is not null then TRIM(T9.categoria_descricao)
        when TRIM(T7.categoria_descricao) is not null then TRIM(T7.categoria_descricao)
        end as foi_ao_posto_ou_internado,   
    CASE
        WHEN T1.resultado_covid = 1 THEN 'Sim'
        WHEN T1.resultado_covid_2 = 1 THEN 'Sim'
        WHEN T1.resultado_covid_3 = 1 THEN 'Sim'
        WHEN T1.resultado_covid IS NULL 
             AND T1.resultado_covid_2 IS NULL 
             AND T1.resultado_covid_3 IS NULL THEN NULL      
        ELSE 'Não'
    END AS teve_covid,
    CASE
        WHEN T10.categoria_descricao IS NOT NULL THEN TRIM(T10.categoria_descricao)
        WHEN T11.categoria_descricao IS NOT NULL THEN TRIM(T11.categoria_descricao)
        WHEN T12.categoria_descricao IS NOT NULL THEN TRIM(T12.categoria_descricao)
    END AS resultado_covid,
    TRIM(T13.categoria_descricao) AS tem_plano_saude,
    TRIM(T14.categoria_descricao) AS faixa_rendimento,
    TRIM(T15.categoria_descricao) AS situacao_domicilio
FROM tbx001_data T1
LEFT JOIN tbx002_dimensao_geral T2 ON T2.codigo_variavel = 'UF' AND TRIM(T1.uf) = TRIM(T2.categoria_tipo)
LEFT JOIN tbx002_dimensao_geral T3 ON T3.codigo_variavel = 'A003' AND TRIM(T1.sexo) = TRIM(T3.categoria_tipo)
LEFT JOIN tbx002_dimensao_geral T4 ON T4.codigo_variavel = 'A004' AND TRIM(T1.cor_raca) = TRIM(T4.categoria_tipo)
LEFT JOIN tbx002_dimensao_geral T5 ON T5.codigo_variavel = 'V1023' AND TRIM(T1.tipo_area) = TRIM(T5.categoria_tipo)
LEFT JOIN tbx002_dimensao_geral T6 ON T6.codigo_variavel = 'A005' AND TRIM(T1.escolaridade) = TRIM(T6.categoria_tipo)
LEFT JOIN tbx002_dimensao_geral T7 ON T7.codigo_variavel = 'B002' AND TRIM(T1.foi_posto_saude) = TRIM(T7.categoria_tipo)
LEFT JOIN tbx002_dimensao_geral T8 ON T8.codigo_variavel = 'B0031' AND TRIM(T1.ficou_em_casa) = TRIM(T8.categoria_tipo)
LEFT JOIN tbx002_dimensao_geral T9 ON T9.codigo_variavel = 'B005' AND TRIM(T1.ficou_internado) = TRIM(T9.categoria_tipo)
LEFT JOIN tbx002_dimensao_geral T10 ON T10.codigo_variavel = 'B009B' AND TRIM(T1.resultado_covid) = TRIM(T10.categoria_tipo)
LEFT JOIN tbx002_dimensao_geral T11 ON T11.codigo_variavel = 'B009D' AND TRIM(T1.resultado_covid_2) = TRIM(T11.categoria_tipo)
LEFT JOIN tbx002_dimensao_geral T12 ON T12.codigo_variavel = 'B009F' AND TRIM(T1.resultado_covid_3) = TRIM(T12.categoria_tipo)
LEFT JOIN tbx002_dimensao_geral T13 ON T13.codigo_variavel = 'B007' AND TRIM(T1.tem_plano_saude) = TRIM(T13.categoria_tipo)
LEFT JOIN tbx002_dimensao_geral T14 ON T14.codigo_variavel = 'C01011' AND TRIM(T1.faixa_rendimento) = TRIM(T14.categoria_tipo)
LEFT JOIN tbx002_dimensao_geral T15 ON T15.codigo_variavel = 'F001' AND TRIM(T1.situacao_domicilio) = TRIM(T15.categoria_tipo)

"""

In [13]:
# Executar a consulta SQL
df_joined = spark.sql(query)

In [14]:
print(len(df_joined.columns))

20


In [15]:
df_joined.printSchema()

root
 |-- estado: string (nullable = true)
 |-- ano: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- semana: integer (nullable = true)
 |-- ano_nascimento: integer (nullable = true)
 |-- faixa_etaria: string (nullable = false)
 |-- sexo: string (nullable = true)
 |-- cor_raca: string (nullable = true)
 |-- tipo_area: string (nullable = true)
 |-- escolaridade: string (nullable = true)
 |-- teve_sintomas_covid: string (nullable = true)
 |-- foi_posto_saude: string (nullable = true)
 |-- ficou_em_casa: string (nullable = true)
 |-- ficou_internado: string (nullable = true)
 |-- foi_ao_posto_ou_internado: string (nullable = true)
 |-- teve_covid: string (nullable = true)
 |-- resultado_covid: string (nullable = true)
 |-- tem_plano_saude: string (nullable = true)
 |-- faixa_rendimento: string (nullable = true)
 |-- situacao_domicilio: string (nullable = true)



In [16]:
save_to_bigquery(df_joined, "SPEC", "tbx001_data")

24/09/30 17:38:01 INFO GhfsStorageStatistics: Detected potential high latency for operation op_get_file_status. latencyMs=662; previousMaxLatencyMs=0; operationCount=1; context=gs://meu-bucket-temporario-spark/.spark-bigquery-local-1727728670946-92487ec5-3579-492d-9777-f9ccd7a1a0b2
24/09/30 17:38:01 INFO DirectBigQueryRelation: |Querying table my-project-1508437523553.SOR.tbx001_data, parameters sent from Spark:|requiredColumns=[UF,Ano,V1013,V1012,A001B3,A003,A004,V1023,A005,B002,B0031,B005,B009B,B009D,B009F,B007,C01011,F001,B0011,B0012,B0013,B0014,B0015,B0016,B0017,B0018,B0019,B00110,B00111,B00112,B00113],|filters=[]
24/09/30 17:38:03 INFO ReadSessionCreator: Read session:{"readSessionName":"projects/my-project-1508437523553/locations/us-east1/sessions/CAISDEVFcnRXVnpIVjMyYxoCdngaAnVo","readSessionCreationStartTime":"2024-09-30T20:38:01.334Z","readSessionCreationEndTime":"2024-09-30T20:38:03.640Z","readSessionPrepDuration":1206,"readSessionCreationDuration":1100,"readSessionDuration":

24/09/30 18:08:03 INFO BlockManagerInfo: Removed broadcast_14_piece0 on 10.0.2.15:34323 in memory (size: 348.0 B, free: 366.2 MiB)
24/09/30 18:08:03 INFO BlockManagerInfo: Removed broadcast_26_piece0 on 10.0.2.15:34323 in memory (size: 244.0 B, free: 366.2 MiB)
24/09/30 18:08:03 INFO BlockManagerInfo: Removed broadcast_28_piece0 on 10.0.2.15:34323 in memory (size: 99.6 KiB, free: 366.3 MiB)
24/09/30 18:08:03 INFO BlockManagerInfo: Removed broadcast_21_piece0 on 10.0.2.15:34323 in memory (size: 359.0 B, free: 366.3 MiB)
24/09/30 18:08:03 INFO BlockManagerInfo: Removed broadcast_25_piece0 on 10.0.2.15:34323 in memory (size: 348.0 B, free: 366.3 MiB)
24/09/30 18:08:03 INFO BlockManagerInfo: Removed broadcast_19_piece0 on 10.0.2.15:34323 in memory (size: 1038.0 B, free: 366.3 MiB)
24/09/30 18:08:03 INFO BlockManagerInfo: Removed broadcast_17_piece0 on 10.0.2.15:34323 in memory (size: 492.0 B, free: 366.3 MiB)
24/09/30 18:08:03 INFO BlockManagerInfo: Removed broadcast_15_piece0 on 10.0.2.15