In [1]:
# Import necessary libraries
import requests
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import io

# Extract Phase

In [2]:
# Step 1: Fetch data from the URL
url = 'https://compras.dados.gov.br/servicos/v1/servicos.csv?descricao=educação'
data = requests.get(url).content

In [3]:
# Step 2: Parse the CSV data into a pandas DataFrame
parse_data = io.StringIO(data.decode('utf-8'))
df = pd.read_csv(parse_data)

# Transform Phase

In [4]:
# Step 3: Create a Spark session
spark = SparkSession.builder.appName('pipeline1').getOrCreate()

In [5]:
# Step 4: Convert the pandas DataFrame to a Spark DataFrame
spark_df = spark.createDataFrame(df)

In [6]:
# Step 5: Print schema and show data
spark_df.printSchema()
spark_df.show()

root
 |-- Código: long (nullable = true)
 |-- Descrição: string (nullable = true)
 |-- Unidade medida: string (nullable = true)
 |-- CPC: long (nullable = true)
 |-- Seção: string (nullable = true)
 |-- Divisão: string (nullable = true)
 |-- Grupo: string (nullable = true)
 |-- Classe: string (nullable = true)
 |-- Subclasse: double (nullable = true)

+------+--------------------+--------------+----+--------------------+--------------------+--------------------+--------------------+---------+
|Código|           Descrição|Unidade medida| CPC|               Seção|             Divisão|               Grupo|              Classe|Subclasse|
+------+--------------------+--------------+----+--------------------+--------------------+--------------------+--------------------+---------+
| 14311|Orientação / Educ...| UNHOMEM/H MÊS|9659|9: SERVIÇOS DA CO...|96: SERVIÇOS RECR...|965: SERVIÇOS REL...|9659: OUTROS ESPO...|      NaN|
| 15946|Serviço Penitenci...|            UN| 979|9: SERVIÇOS DA CO...|

In [7]:
# Step 6: Clean up column names by replacing spaces with underscores and converting to lowercase
cleaned_col_names = [col_name.replace(" ", "_").lower() for col_name in spark_df.columns]
spark_df = spark_df.toDF(*cleaned_col_names)  # Rename all columns at once

In [8]:
# Step 7: Print schema again to see the cleaned names
spark_df.printSchema()

root
 |-- código: long (nullable = true)
 |-- descrição: string (nullable = true)
 |-- unidade_medida: string (nullable = true)
 |-- cpc: long (nullable = true)
 |-- seção: string (nullable = true)
 |-- divisão: string (nullable = true)
 |-- grupo: string (nullable = true)
 |-- classe: string (nullable = true)
 |-- subclasse: double (nullable = true)



In [9]:
# Step 8: Rename specific columns if needed
spark_df = (spark_df.withColumnRenamed("código", 'codigo')
                    .withColumnRenamed("descrição", 'descricao')
                    .withColumnRenamed("seção", 'secao')
                    .withColumnRenamed("divisão", 'divisao')
                    .withColumnRenamed("unidade_medida", 'unidade_medida'))

In [10]:
# Step 9: Print the final list of columns to confirm the changes
print(spark_df.columns)

['codigo', 'descricao', 'unidade_medida', 'cpc', 'secao', 'divisao', 'grupo', 'classe', 'subclasse']


In [11]:
# Step 10: Show the final DataFrame
spark_df.show()

+------+--------------------+--------------+----+--------------------+--------------------+--------------------+--------------------+---------+
|codigo|           descricao|unidade_medida| cpc|               secao|             divisao|               grupo|              classe|subclasse|
+------+--------------------+--------------+----+--------------------+--------------------+--------------------+--------------------+---------+
| 14311|Orientação / Educ...| UNHOMEM/H MÊS|9659|9: SERVIÇOS DA CO...|96: SERVIÇOS RECR...|965: SERVIÇOS REL...|9659: OUTROS ESPO...|      NaN|
| 15946|Serviço Penitenci...|            UN| 979|9: SERVIÇOS DA CO...|97: OUTROS SERVIÇ...|979: OUTROS SERVI...|                 NaN|      NaN|
| 18481|Consultoria e Ass...|            UN|8319|8: SERVIÇOS DE AR...|83: OUTROS SERVIÇ...|831: SERVIÇOS DE ...|8319: OUTROS SERV...|      NaN|
| 19321|Curso / Treinamen...|            UN| 929|9: SERVIÇOS DA CO...|92: SERVIÇOS DA I...|929: OUTROS SERVI...|                 NaN|   

# Load Phase

In [12]:
# Step 11: Mount Google Drive
from google.colab import drive

try:
    drive.mount('/content/drive')
except Exception as e:
    print(f"An error occurred while mounting Google Drive: {e}")

Mounted at /content/drive


In [13]:
# Step 12: Define directory path in Google Drive
dir = '/content/drive/MyDrive/'

In [14]:
# Step 13: Write the Spark DataFrame to CSV in Google Drive
spark_df.write.mode("overwrite")\
        .format("csv")\
        .option('header', 'true')\
        .save(f"{dir}/compras")

In [15]:
# Step 14: Read the CSV file back into a Spark DataFrame
loaded_df = spark.read.csv(f"{dir}/compras", header=True, inferSchema=True)

In [16]:
# Step 15: Show the loaded DataFrame
loaded_df.printSchema()
loaded_df.show()

root
 |-- codigo: integer (nullable = true)
 |-- descricao: string (nullable = true)
 |-- unidade_medida: string (nullable = true)
 |-- cpc: integer (nullable = true)
 |-- secao: string (nullable = true)
 |-- divisao: string (nullable = true)
 |-- grupo: string (nullable = true)
 |-- classe: string (nullable = true)
 |-- subclasse: double (nullable = true)

+------+--------------------+--------------+----+--------------------+--------------------+--------------------+--------------------+---------+
|codigo|           descricao|unidade_medida| cpc|               secao|             divisao|               grupo|              classe|subclasse|
+------+--------------------+--------------+----+--------------------+--------------------+--------------------+--------------------+---------+
| 18481|Consultoria e Ass...|            UN|8319|8: SERVIÇOS DE AR...|83: OUTROS SERVIÇ...|831: SERVIÇOS DE ...|8319: OUTROS SERV...|      NaN|
| 19321|Curso / Treinamen...|            UN| 929|9: SERVIÇOS DA 