In [1]:
import argparse
from pyspark.sql import SparkSession
from pyspark.sql import functions as functions
from pyspark.sql.functions import regexp_replace, when, year, month, to_date, col
from pyspark.sql.types import StringType, IntegerType, FloatType
import re

In [2]:
def start_or_create_spark():
    from pyspark.sql import SparkSession
    spark = (SparkSession
             .builder
             .appName("Proc de Dados de Gasolina no Brasil")
             .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar')
             .getOrCreate()
             )
    return spark

In [3]:
def rename_columns(dataframe):
    """
    Função que recebe o Dataframe Spark e realize o rename de colunas acordo
    com a documentação do BigQuery: https://cloud.google.com/bigquery/docs/schemas#:~:text=description%20and%20mode.-,Column%20names,name%20length%20is%20300%20characters.
    e retorne o Dataframe com as colunas renomeadas.
    Tip: withColumnRenamed
    """
    dataframe = dataframe.select([functions.col(x).alias(x.lower()) for x in dataframe.columns])
    dataframe = dataframe.select([functions.col(col).alias(re.sub(" -", "", col)) for col in dataframe.columns])
    dataframe = dataframe.select([functions.col(col).alias(re.sub(" ", "_", col)) for col in dataframe.columns])

    return dataframe


In [4]:
def add_year(dataframe, coluna):
    """
    Parametros: dataframe, coluna

    Função que recebe um Dataframe Spark e o nome da coluna que será baseada para criar
    uma coluna `year` no Dataframe com os dados lidos do GCS.
    O resultado da coluna deverá ser o Ano.
    E retorne o dataframe com os dados e a nova coluna criada.
    """
    dataframe = dataframe.withColumn("data", to_date(col(coluna), "dd/MM/yyyy"))
    dataframe = dataframe.withColumn("ano", year(col("data")))

    return dataframe

In [5]:
def add_semestre(dataframe, coluna):
    """
    Parametros: dataframe, coluna

    Função que recebe um Dataframe Spark e o nome da coluna que será baseada para criar
    uma coluna `semestre` no Dataframe com os dados lidos do GCS. O resultado deverá ser 1 ou 2.
    E retorne o dataframe com os dados e a nova coluna criada.
    """

    dataframe = dataframe.withColumn('semestre', month(coluna))
    dataframe = dataframe.withColumn('semestre', when(col("semestre") < 7, 1)
                                     .when(col("semestre") >= 7, 2))

    return dataframe

In [6]:
def add_filename_input(dataframe):
    """
    Parametros: dataframe

    Função que recebe um Dataframe Spark que crie uma coluna `input_file_name` que será baseada no nome do arquivo lido.
    E retorne o dataframe com os dados e a nova coluna criada.
    """

    dataframe = dataframe.withColumn("input_file_name", functions.input_file_name())

    return dataframe

In [7]:
def put_file_gcs(dataframe, path_output, formato):
    """
    :param path_output: path para save dos dados
    :param dataframe: conjunto de dados a serem salvos
    :param formato: tipo de arquivo a ser salvo
    :return: None

    Função que salve os dados no GCS, utilizando o metodo write do Dataframe Spark.
    """
    dataframe.repartition(1).write.format(formato).mode("overwrite").save(path_output)

    return None

In [8]:
def write_bigquery(dataframe, bq_dataset, bq_table, gcs_tmp_bucket):
    """
    Crie uma função que receba os parametros:
    :param dataframe: conjunto de dados a serem salvos
    :param tabela: Tabela do BigQuery que será salvo os dados. Ex: dataset.tabela_exemplo
    :param temporaryGcsBucket: Bucket temporário para salvar area de staging do BigQuery.

    E escreva dentro do BigQuery.
    Utilize o material de referencia:
    https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example#running_the_code
    """

    # spark.conf.set('temporaryGcsBucket', gcs_tmp_bucket)
    dataframe.write \
        .format("bigquery") \
        .option("table", "{}.{}".format(bq_dataset, bq_table)) \
        .option("temporaryGcsBucket", gcs_tmp_bucket) \
        .mode('append') \
        .save()

    return None


In [18]:
def main(path_input, path_output, file_format, bq_dataset, bq_table, gcs_tmp_bucket):
    try:
        """
        Crie uma função main que receba como parametro:
        path_input: Caminho dos dados no GCS gerados pela API coletora. Ex: gs://bucket_name/file_name
        path_output: Caminho de onde será salvo os dados processados. Ex: gs://bucket_name_2/file_name
        formato_file_save: Formato de arquivo a ser salvo no path_output. Ex: PARQUET
        tabela_bq: Tabela do BigQuery que será salvo os dados. Ex: dataset.tabela_exemplo


        1 - Faça a leitura dos dados de acordo com o path_input informado
        2 - Realize o rename de colunas do arquivo, respeitando os padroes do BigQuery
        3 - Adicione uma coluna de Ano, baseado na coluna `Data da Coleta`
        4 - Adicione uma coluna de Semestre, baseado na coluna de `Data da Coleta`
        5 - Adicione uma coluna Filename. Tip: pyspark.sql.functions.input_file_name
        6 - Faça o parse dos dados lidos de acordo com a tabela no BigQuery
        7 - Escreva os dados no Bucket GCS, no caminho informado `path_output` 
            no formato especificado no atributo `formato_file_save`.
        8 - Escreva os dados no BigQuery de acordo com a tabela especificada no atributo `tabela_bq`
        """
        spark = start_or_create_spark()
        df = spark.read.format('csv').option("header", "true").option('delimiter', ';').load(path_input)
        df = rename_columns(df)
        df = add_year(df, "data_da_coleta")
        df = add_semestre(df, "data")
        df = add_filename_input(df)
        df = df.withColumn("numero_rua", col("numero_rua").cast(IntegerType())) \
            .withColumn("ano", col("ano").cast(StringType())) \
            .withColumn("semestre", col("semestre").cast(StringType())) \
            .withColumn('valor_de_venda', regexp_replace('valor_de_venda', ',', '.').cast(FloatType())) \
            .withColumn('valor_de_compra', regexp_replace('valor_de_compra', ',', '.').cast(FloatType()))
        put_file_gcs(df, path_output, file_format)
        write_bigquery(df, bq_dataset, bq_table, gcs_tmp_bucket)

        return df
    except Exception as ex:
        print(ex)

In [19]:
if __name__ == '__main__':
    data = main(path_input=f"gs://gcp-datapipeline-389020-raw/combustiveis/2019/02/ca-2019-02.csv",
                path_output=f"gs://gcp-datapipeline-389020-curated/combustiveis/2019/02",
                file_format="parquet",
                bq_dataset="gasolina_brasil",
                bq_table="tb_historico_combustivel_brasil",
                gcs_tmp_bucket="gcp-datapipeline-389020-pyspark-tmp")

                                                                                

In [12]:
data.limit(5).toPandas()

Unnamed: 0,regiao_sigla,estado_sigla,municipio,revenda,cnpj_da_revenda,nome_da_rua,numero_rua,complemento,bairro,cep,produto,data_da_coleta,valor_de_venda,valor_de_compra,unidade_de_medida,bandeira,data,ano,semestre,input_file_name
0,S,RS,CANOAS,METROPOLITANO COMERCIO DE COMBUSTIVEIS LTDA,88.587.589/0001-17,AVENIDA GUILHERME SCHELL,6340.0,,CENTRO,92310-000,GASOLINA,01/07/2019,4.259,,R$ / litro,BRANCA,2019-07-01,2019,2,gs://gcp-datapipeline-389020-raw/combustiveis/...
1,S,RS,CANOAS,METROPOLITANO COMERCIO DE COMBUSTIVEIS LTDA,88.587.589/0001-17,AVENIDA GUILHERME SCHELL,6340.0,,CENTRO,92310-000,ETANOL,01/07/2019,4.099,,R$ / litro,BRANCA,2019-07-01,2019,2,gs://gcp-datapipeline-389020-raw/combustiveis/...
2,S,RS,CANOAS,METROPOLITANO COMERCIO DE COMBUSTIVEIS LTDA,88.587.589/0001-17,AVENIDA GUILHERME SCHELL,6340.0,,CENTRO,92310-000,GNV,01/07/2019,3.449,,R$ / m³,BRANCA,2019-07-01,2019,2,gs://gcp-datapipeline-389020-raw/combustiveis/...
3,NE,BA,ITABUNA,LOPES LEMOS COMERCIO DE COMBUSTIVEIS LTDA,00.231.792/0001-05,RODOVIA BR 101,,KM 503 5,MANOEL LEAO,45601-402,GASOLINA,01/07/2019,4.69,,R$ / litro,BRANCA,2019-07-01,2019,2,gs://gcp-datapipeline-389020-raw/combustiveis/...
4,NE,BA,ITABUNA,LOPES LEMOS COMERCIO DE COMBUSTIVEIS LTDA,00.231.792/0001-05,RODOVIA BR 101,,KM 503 5,MANOEL LEAO,45601-402,ETANOL,01/07/2019,3.49,,R$ / litro,BRANCA,2019-07-01,2019,2,gs://gcp-datapipeline-389020-raw/combustiveis/...


In [14]:
data.dtypes

[('regiao_sigla', 'string'),
 ('estado_sigla', 'string'),
 ('municipio', 'string'),
 ('revenda', 'string'),
 ('cnpj_da_revenda', 'string'),
 ('nome_da_rua', 'string'),
 ('numero_rua', 'int'),
 ('complemento', 'string'),
 ('bairro', 'string'),
 ('cep', 'string'),
 ('produto', 'string'),
 ('data_da_coleta', 'string'),
 ('valor_de_venda', 'float'),
 ('valor_de_compra', 'float'),
 ('unidade_de_medida', 'string'),
 ('bandeira', 'string'),
 ('data', 'date'),
 ('ano', 'string'),
 ('semestre', 'string'),
 ('input_file_name', 'string')]

In [17]:
data.groupBy("data").count().show(truncate=False)

                                                                                

+----------+-----+
|data      |count|
+----------+-----+
|2019-07-30|3857 |
|2019-08-05|5694 |
|2019-08-15|2685 |
|null      |1    |
|2019-08-13|2212 |
|2019-07-08|4888 |
|2019-07-11|3788 |
|2019-07-25|1995 |
|2019-07-23|4190 |
|2019-07-03|5808 |
|2019-07-22|6157 |
|2019-07-10|6078 |
|2019-07-16|2889 |
|2019-07-04|2463 |
|2019-07-31|6485 |
|2019-07-15|6034 |
|2019-07-17|6976 |
|2019-07-02|5331 |
|2019-07-09|4316 |
|2019-07-01|5420 |
+----------+-----+
only showing top 20 rows

