In [1]:
!pip install google-auth google-auth-oauthlib google-auth-httplib2 google-cloud-bigquery




[notice] A new release of pip is available: 23.2.1 -> 23.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
!pip freeze > ..//requirements.txt

In [2]:
from _spark import *
from transformations import transform

import pyspark.sql.functions as f
import pyspark.sql.types as t
import os
import findspark
findspark.init()

spark = get_spark()

gcs_bucket =  'tech-challenge'

In [3]:
df = spark\
    .read\
    .option('delimiter',',')\
    .option('header',True)\
    .option('inferSchema',True)\
    .csv('../data/raw')

## BigQuery links

- [BigQuery Table](https://console.cloud.google.com/bigquery?hl=pt-br&project=fiap-tech-challenge-3&ws=!1m0)
- [Storage](https://console.cloud.google.com/storage/browser/tech-challenge;tab=configuration?hl=pt-br&project=fiap-tech-challenge-3&prefix=&forceOnObjectsSortingFiltering=false)
- [IAM e admin](https://console.cloud.google.com/iam-admin/iam?hl=pt-br&project=fiap-tech-challenge-3)

### Raw Data

#### Fato

In [5]:
# Write a PySpark DataFrame to a BigQuery table

df.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'raw_pnad')\
    .option("table", "tb_f_covid_2020")\
    .mode("overwrite")\
    .save()


#### Dimensao

In [6]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('uf', t.StringType())
    ]
)

_uf = spark.createDataFrame(data=[
    {
        "11": "Rondônia",
        "12": "Acre",
        "13": "Amazonas",
        "14": "Roraima",
        "15": "Pará",
        "16": "Amapá",
        "17": "Tocantins",
        "21": "Maranhão",
        "22": "Piauí",
        "23": "Ceará",
        "24": "Rio Grande do Norte",
        "25": "Paraíba",
        "26": "Pernambuco",
        "27": "Alagoas",
        "28": "Sergipe",
        "29": "Bahia",
        "31": "Minas Gerais",
        "32": "Espírito Santo",
        "33": "Rio de Janeiro",
        "35": "São Paulo",
        "41": "Paraná",
        "42": "Santa Catarina",
        "43": "Rio Grande do Sul",
        "50": "Mato Grosso do Sul",
        "51": "Mato Grosso",
        "52": "Goiás",
        "53": "Distrito Federal",
    }], schema=_schema)

_uf.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'raw_pnad')\
    .option("table", "tb_d_uf")\
    .mode("overwrite")\
    .save()

In [7]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('area_domicilio', t.StringType())
    ]
)

_area_domicilio = spark.createDataFrame(data=[
    {
        '1': 'Urbana',
        '2': 'Rural',
    }], schema=_schema)

_area_domicilio.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'raw_pnad')\
    .option("table", "tb_d_area_domicilio")\
    .mode("overwrite")\
    .save()

In [8]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('sexo', t.StringType())
    ]
)

_sexo = spark.createDataFrame(data=[
    {
        '1': 'Masculino',
        '2': 'Feminino',
    }], schema=_schema)

_sexo.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'raw_pnad')\
    .option("table", "tb_d_sexo")\
    .mode("overwrite")\
    .save()

In [9]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('raca', t.StringType())
    ]
)

_raca = spark.createDataFrame(data=[
    {
        '1': 'Branca',
        '2': 'Preta',
        '3': 'Amarela',
        '4': 'Parda',
        '5': 'Indígena',
        '9': 'Ignorado',
    }], schema=_schema)

_raca.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'raw_pnad')\
    .option("table", "tb_d_raca")\
    .mode("overwrite")\
    .save()

In [10]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('escolaridade', t.StringType())
    ]
)

_escolaridade = spark.createDataFrame(data=[
    {
        '1': 'Sem instrução',
        '2': 'Fundamental incompleto',
        '3': 'Fundamental completa',
        '4': 'Médio incompleto',
        '5': 'Médio completo',
        '6': 'Superior incompleto',
        '7': 'Superior completo',
        '8': 'Pós-graduação, mestrado ou doutorado',
    }], schema=_schema)

_escolaridade.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'raw_pnad')\
    .option("table", "tb_d_escolaridade")\
    .mode("overwrite")\
    .save()

In [11]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('escolaridade', t.StringType())
    ]
)

_resposta_covid = spark.createDataFrame(data=[
    {
        '1': 'Sim',
        '2': 'Não ',
        '3': 'Não sabe',
        '9': 'Ignorado',
    }], schema=_schema)

_resposta_covid.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'raw_pnad')\
    .option("table", "tb_d_resposta_covid")\
    .mode("overwrite")\
    .save()

In [12]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('resposta_internado', t.StringType())
    ]
)

_resposta_internado = spark.createDataFrame(data=[
    {
        '1': 'Sim',
        '2': 'Não ',
        '3': 'Não foi atendido',
        '9': 'Ignorado',
    }], schema=_schema)

_resposta_internado.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'raw_pnad')\
    .option("table", "tb_d_resposta_internado")\
    .mode("overwrite")\
    .save()

In [13]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('resposta_faixa_rendimento', t.StringType())
    ]
)

_resposta_faixa_rendimento = spark.createDataFrame(data=[
    {
        '00':   '0 - 100',
        '01':	'101 - 300',
        '02':	'301 - 600',
        '03':	'601 - 800',
        '04':	'801 - 1.600',
        '05':	'1.601 - 3.000',
        '06':	'3.001 - 10.000',
        '07':	'10.001 - 50.000',
        '08':	'50.001 - 100.000',
        '09':	'Mais de 100.000',
    }], schema=_schema)

_resposta_faixa_rendimento.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'raw_pnad')\
    .option("table", "tb_d_resposta_faixa_rendimento")\
    .mode("overwrite")\
    .save()

In [14]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('resposta_situacao_domicilio', t.StringType())
    ]
)

_resposta_situacao_domicilio = spark.createDataFrame(data=[
    {
        '1': 'Próprio - já pago ',
        '2': 'Próprio - ainda pagando',
        '3': 'Alugado',
        '4': 'Cedido por empregador',
        '5': 'Cedido por familiar ',
        '6': 'Cedido de outra forma ',
        '7': 'Outra condição',
    }], schema=_schema)

_resposta_situacao_domicilio.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'raw_pnad')\
    .option("table", "tb_d_resposta_situacao_domicilio")\
    .mode("overwrite")\
    .save()

In [15]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('questao', t.StringType())
    ]
)

_mapa_questoes = spark.createDataFrame(data=[{  
    "UF": "uf"
  , "V1012": "semana_mes"
  , "V1013": "mes"
  , "V1022": "area_domicilio"
  , "A002": "idade"
  , "A003": "sexo"
  , "A004": "cor_raca"
  , "A005": "escolaridade"
  , "B0011": "teve_febre"
  , "B0014": "teve_dificuldade_respirar"
  , "B0015": "teve_dor_cabeca"
  , "B0019": "teve_fadiga"
  , "B00111": "teve_perda_cheiro"
  , "B002": "foi_posto_saude"
  , "B0031": "ficou_em_casa"
  , "B005": "ficou_internado"
  , "B007": "tem_plano_saude"
  , "C007B": "assalariado"
  , "C01011": "faixa_rendimento"
  , "F001": "situacao_domicilio"
  }], schema=_schema)

_mapa_questoes.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'raw_pnad')\
    .option("table", "tb_d_questoes")\
    .mode("overwrite")\
    .save()

In [16]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('sexo', t.StringType())
    ]
)

_sexo = spark.createDataFrame(data=[
    {
        '1': 'Masculino',
        '2': 'Feminino',
    }], schema=_schema)

_sexo.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'raw_pnad')\
    .option("table", "tb_d_sexo")\
    .mode("overwrite")\
    .save()

### Refined Data

#### Fato

In [17]:
df = spark\
    .read\
    .option('delimiter',',')\
    .option('header',True)\
    .option('inferSchema',True)\
    .csv('../data/raw')

columns = [
    "UF", "V1012", "V1013", "V1022", "A002", "A003",
    "A004", "A005", "B0011", "B0014", "B0015", "B0019",
    "B00111", "B002", "B0031", "B005", "B007", "C007B",
    "C01011", "F001",'B009B'
]

df = df.select(columns)

df = df\
        .withColumnRenamed("UF", "uf")\
        .withColumnRenamed("V1012", "semana_mes")\
        .withColumnRenamed("V1013", "mes")\
        .withColumnRenamed("V1022", "area_domicilio")\
        .withColumnRenamed("A002", "idade")\
        .withColumnRenamed("A003", "sexo")\
        .withColumnRenamed("A004", "cor_raca")\
        .withColumnRenamed("A005", "escolaridade")\
        .withColumnRenamed("B0011", "teve_febre")\
        .withColumnRenamed("B0014", "teve_dificuldade_respirar")\
        .withColumnRenamed("B0015", "teve_dor_cabeca")\
        .withColumnRenamed("B0019", "teve_fadiga")\
        .withColumnRenamed("B00111", "teve_perda_cheiro")\
        .withColumnRenamed("B002", "foi_posto_saude")\
        .withColumnRenamed("B0031", "ficou_em_casa")\
        .withColumnRenamed("B005", "ficou_internado")\
        .withColumnRenamed("B009B", "resultado_covid")\
        .withColumnRenamed("B007", "tem_plano_saude")\
        .withColumnRenamed("C007B", "assalariado")\
        .withColumnRenamed("C01011", "faixa_rendimento")\
        .withColumnRenamed("F001", "situacao_domicilio")

df.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'refined_pnad')\
    .option("table", "tb_f_covid_2020")\
    .mode("overwrite")\
    .save()

#### Dimensao

In [18]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('uf', t.StringType())
    ]
)

_uf = spark.createDataFrame(data=[
    {
        "11": "Rondônia",
        "12": "Acre",
        "13": "Amazonas",
        "14": "Roraima",
        "15": "Pará",
        "16": "Amapá",
        "17": "Tocantins",
        "21": "Maranhão",
        "22": "Piauí",
        "23": "Ceará",
        "24": "Rio Grande do Norte",
        "25": "Paraíba",
        "26": "Pernambuco",
        "27": "Alagoas",
        "28": "Sergipe",
        "29": "Bahia",
        "31": "Minas Gerais",
        "32": "Espírito Santo",
        "33": "Rio de Janeiro",
        "35": "São Paulo",
        "41": "Paraná",
        "42": "Santa Catarina",
        "43": "Rio Grande do Sul",
        "50": "Mato Grosso do Sul",
        "51": "Mato Grosso",
        "52": "Goiás",
        "53": "Distrito Federal",
    }], schema=_schema)

_uf.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'refined_pnad')\
    .option("table", "tb_d_uf")\
    .mode("overwrite")\
    .save()

In [4]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('area_domicilio', t.StringType())
    ]
)

_area_domicilio = spark.createDataFrame(data=[
    {
        '1': 'Urbana',
        '2': 'Rural',
    }], schema=_schema)

_area_domicilio.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'refined_pnad')\
    .option("table", "tb_d_area_domicilio")\
    .mode("overwrite")\
    .save()

Py4JJavaError: An error occurred while calling o711.save.
: com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Failed to write to BigQuery
	at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:111)
	at com.google.cloud.spark.bigquery.write.BigQueryDeprecatedIndirectInsertableRelation.insert(BigQueryDeprecatedIndirectInsertableRelation.java:43)
	at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:54)
	at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:107)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 30 in stage 5.0 failed 1 times, most recent failure: Lost task 30.0 in stage 5.0 (TID 128) (DESKTOP-1B13685 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:179)
	... 30 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:106)
	... 44 more
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:179)
	... 30 more


In [20]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('sexo', t.StringType())
    ]
)

_sexo = spark.createDataFrame(data=[
    {
        '1': 'Masculino',
        '2': 'Feminino',
    }], schema=_schema)

_sexo.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'refined_pnad')\
    .option("table", "tb_d_sexo")\
    .mode("overwrite")\
    .save()

In [21]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('raca', t.StringType())
    ]
)

_raca = spark.createDataFrame(data=[
    {
        '1': 'Branca',
        '2': 'Preta',
        '3': 'Amarela',
        '4': 'Parda',
        '5': 'Indígena',
        '9': 'Ignorado',
    }], schema=_schema)

_raca.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'refined_pnad')\
    .option("table", "tb_d_raca")\
    .mode("overwrite")\
    .save()

In [22]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('escolaridade', t.StringType())
    ]
)

_escolaridade = spark.createDataFrame(data=[
    {
        '1': 'Sem instrução',
        '2': 'Fundamental incompleto',
        '3': 'Fundamental completa',
        '4': 'Médio incompleto',
        '5': 'Médio completo',
        '6': 'Superior incompleto',
        '7': 'Superior completo',
        '8': 'Pós-graduação, mestrado ou doutorado',
    }], schema=_schema)

_escolaridade.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'refined_pnad')\
    .option("table", "tb_d_escolaridade")\
    .mode("overwrite")\
    .save()

In [23]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('escolaridade', t.StringType())
    ]
)

_resposta_covid = spark.createDataFrame(data=[
    {
        '1': 'Sim',
        '2': 'Não ',
        '3': 'Não sabe',
        '9': 'Ignorado',
    }], schema=_schema)

_resposta_covid.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'refined_pnad')\
    .option("table", "tb_d_resposta_covid")\
    .mode("overwrite")\
    .save()

In [24]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('resposta_internado', t.StringType())
    ]
)

_resposta_internado = spark.createDataFrame(data=[
    {
        '1': 'Sim',
        '2': 'Não ',
        '3': 'Não foi atendido',
        '9': 'Ignorado',
    }], schema=_schema)

_resposta_internado.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'refined_pnad')\
    .option("table", "tb_d_resposta_internado")\
    .mode("overwrite")\
    .save()

In [25]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('resposta_faixa_rendimento', t.StringType())
    ]
)

_resposta_faixa_rendimento = spark.createDataFrame(data=[
    {
        '00':   '0 - 100',
        '01':	'101 - 300',
        '02':	'301 - 600',
        '03':	'601 - 800',
        '04':	'801 - 1.600',
        '05':	'1.601 - 3.000',
        '06':	'3.001 - 10.000',
        '07':	'10.001 - 50.000',
        '08':	'50.001 - 100.000',
        '09':	'Mais de 100.000',
    }], schema=_schema)

_resposta_faixa_rendimento.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'refined_pnad')\
    .option("table", "tb_d_resposta_faixa_rendimento")\
    .mode("overwrite")\
    .save()

In [26]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('resposta_situacao_domicilio', t.StringType())
    ]
)

_resposta_situacao_domicilio = spark.createDataFrame(data=[
    {
        '1': 'Próprio - já pago ',
        '2': 'Próprio - ainda pagando',
        '3': 'Alugado',
        '4': 'Cedido por empregador',
        '5': 'Cedido por familiar ',
        '6': 'Cedido de outra forma ',
        '7': 'Outra condição',
    }], schema=_schema)

_resposta_situacao_domicilio.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'refined_pnad')\
    .option("table", "tb_d_resposta_situacao_domicilio")\
    .mode("overwrite")\
    .save()

In [27]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('questao', t.StringType())
    ]
)

_mapa_questoes = spark.createDataFrame(data=[{  
    "UF": "uf"
  , "V1012": "semana_mes"
  , "V1013": "mes"
  , "V1022": "area_domicilio"
  , "A002": "idade"
  , "A003": "sexo"
  , "A004": "cor_raca"
  , "A005": "escolaridade"
  , "B0011": "teve_febre"
  , "B0014": "teve_dificuldade_respirar"
  , "B0015": "teve_dor_cabeca"
  , "B0019": "teve_fadiga"
  , "B00111": "teve_perda_cheiro"
  , "B002": "foi_posto_saude"
  , "B0031": "ficou_em_casa"
  , "B005": "ficou_internado"
  , "B007": "tem_plano_saude"
  , "C007B": "assalariado"
  , "C01011": "faixa_rendimento"
  , "F001": "situacao_domicilio"
  }], schema=_schema)

_mapa_questoes.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'refined_pnad')\
    .option("table", "tb_d_questoes")\
    .mode("overwrite")\
    .save()

In [28]:
_schema = t.StructType(
    [
          t.StructField('cd', t.StringType())
        , t.StructField('sexo', t.StringType())
    ]
)

_sexo = spark.createDataFrame(data=[
    {
        '1': 'Masculino',
        '2': 'Feminino',
    }], schema=_schema)

_sexo.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'refined_pnad')\
    .option("table", "tb_d_sexo")\
    .mode("overwrite")\
    .save()

### Trusted Data

#### Fato

In [1]:
from transformations import transform
from _spark import get_spark, _display
import os
import pyspark.sql.functions as f
import pyspark.sql.types as t
import findspark
findspark.init()

spark = get_spark()
gcs_bucket =  'tech-challenge'

df = spark\
    .read\
    .option('delimiter',',')\
    .option('header',True)\
    .option('inferSchema',True)\
    .csv('../data/raw')

df = transform(df)


df.write\
    .format("bigquery")\
    .option("temporaryGcsBucket", gcs_bucket)\
    .option("credentialsFile",os.environ["GOOGLE_APPLICATION_CREDENTIALS"])\
    .option("project", "fiap-tech-challenge-3")\
    .option("parentProject", "fiap-tech-challenge-3")\
    .option('dataset', 'trusted_pnad')\
    .option("table", "tb_f_covid_2020")\
    .mode("overwrite")\
    .save()


In [2]:
df.groupBy('resultado_covid').agg(f.count('uf')).toPandas()

Unnamed: 0,resultado_covid,count(uf)
0,,737590
1,Não,314989
2,Sim,95811
3,Não sabe,174
4,Ignorado,633


In [5]:
df.count()