<div style="background-color: #222; padding: 24px;">
    <h1 style="color: #d4bbff; margin-bottom: 8px;">Bronze to Gold: Food Item Categorization Pipeline</h1>
    <h3 style="color: #fff; margin-top: 0;">Categorize and export food items from bronze to gold layer.</h3>
</div>

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
import os
from dotenv import load_dotenv

# Load environment variables
env_path = os.path.join(os.getcwd(), '.env')
load_dotenv(dotenv_path=env_path)

# Define paths with validation
INPUT_DATA_PATH = "/home/jovyan/data/bronze"
OUTPUT_DATA_PATH = "/home/jovyan/data/gold"

# Verify directories exist and are accessible
def verify_directory(path):
    if not os.path.exists(path):
        os.makedirs(path, exist_ok=True)
        os.chmod(path, 0o777)  # RWX for all
    if not os.access(path, os.R_OK | os.W_OK):
        raise PermissionError(f"Insufficient permissions for path: {path}")

try:
    verify_directory(INPUT_DATA_PATH)
    verify_directory(OUTPUT_DATA_PATH)
except Exception as e:
    print(f"Directory verification failed: {str(e)}")
    raise

# Configure Spark with enhanced settings
spark_master = os.getenv("SPARK_MASTER", "spark://spark-master:7077")  # Default fallback

conf = SparkConf() \
    .set("spark.hadoop.fs.permissions.umask-mode", "000") \
    .set("spark.sql.sources.ignoreNonExistentPaths", "true") \
    .set("spark.executor.extraJavaOptions", "-Djava.io.tmpdir=/tmp") \
    .set("spark.driver.extraJavaOptions", "-Djava.io.tmpdir=/tmp") \
    .set("spark.sql.warehouse.dir", "/tmp/spark-warehouse") \
    .set("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .set("spark.executor.memory", "2g") \
    .set("spark.driver.memory", "2g") \
    .set("spark.sql.catalogImplementation", "hive")

# Initialize Spark with error handling
try:
    spark = SparkSession.builder \
        .config(conf=conf) \
        .appName("DataProcessing") \
        .master(spark_master) \
        .enableHiveSupport() \
        .getOrCreate()
    
    # Verify Spark connectivity
    spark.sparkContext.setLogLevel("WARN")
    print(f"Spark session created successfully. Version: {spark.version}")

except Exception as e:
    print(f"Failed to initialize Spark session: {str(e)}")
    raise


Spark session created successfully. Version: 3.5.0


In [2]:
import os

# Hard-set path
OUTPUT_DATA_PATH = "/home/jovyan/data/gold/spark_output"

# Nuclear directory cleanup
os.system(f"rm -rf {OUTPUT_DATA_PATH}")
os.makedirs(OUTPUT_DATA_PATH, exist_ok=True)
os.chmod(OUTPUT_DATA_PATH, 0o777)

# Test write
test_df = spark.createDataFrame([(1, "test")], ["id", "value"])
test_df.coalesce(1).write \
    .option("mapreduce.fileoutputcommitter.algorithm.version", "2") \
    .mode("overwrite") \
    .csv(OUTPUT_DATA_PATH)

print("Success! Check output at:", OUTPUT_DATA_PATH)

Success! Check output at: /home/jovyan/data/gold/spark_output


In [3]:


# Create SparkSession
spark = SparkSession.builder \
     \
    .getOrCreate()



In [4]:
from pyspark.sql.types import (
    StructType, StructField, StringType, ArrayType
)

# Schema for each purchase item inside the purchase array
purchase_item_schema = StructType([
    StructField("Código", StringType(), True),
    StructField("Descrição", StringType(), True),
    StructField("Qtde", StringType(), True),
    StructField("Un", StringType(), True),
    StructField("Vl Unit", StringType(), True),
    StructField("Vl Total", StringType(), True),
])

# Root JSON schema
json_schema = StructType([
    StructField("store", StringType(), True),
    StructField("cnpj", StringType(), True),
    StructField("store_state_code", StringType(), True),
    StructField("store_address", StringType(), True),
    StructField("purchase_date", StringType(), True),  # we can parse date after loading
    StructField("access_key", StringType(), True),
    StructField("purchase", ArrayType(purchase_item_schema), True),
])


In [5]:
df = spark.read.schema(json_schema).json(INPUT_DATA_PATH, multiLine=True)
df.printSchema()
#df.show(3, truncate=False)

root
 |-- store: string (nullable = true)
 |-- cnpj: string (nullable = true)
 |-- store_state_code: string (nullable = true)
 |-- store_address: string (nullable = true)
 |-- purchase_date: string (nullable = true)
 |-- access_key: string (nullable = true)
 |-- purchase: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Código: string (nullable = true)
 |    |    |-- Descrição: string (nullable = true)
 |    |    |-- Qtde: string (nullable = true)
 |    |    |-- Un: string (nullable = true)
 |    |    |-- Vl Unit: string (nullable = true)
 |    |    |-- Vl Total: string (nullable = true)



In [6]:
files = df.inputFiles()
len(files)

677

In [7]:
df.show(2)

+--------------------+------------------+----------------+--------------------+-------------------+--------------------+--------------------+
|               store|              cnpj|store_state_code|       store_address|      purchase_date|          access_key|            purchase|
+--------------------+------------------+----------------+--------------------+-------------------+--------------------+--------------------+
|RIGHI COM.DE GEN....|89.897.201/0002-28|      1060042441|RUA JOAO PESSOA, ...|03/09/2024 09:18:50|4324 0989 8972 01...|[{195, BERINJELA ...|
|RIGHI COM.DE GEN....|89.897.201/0002-28|      1060042441|RUA JOAO PESSOA, ...|08/07/2024 15:19:27|4324 0789 8972 01...|[{29, RUCULA UN, ...|
+--------------------+------------------+----------------+--------------------+-------------------+--------------------+--------------------+
only showing top 2 rows



In [8]:
from pyspark.sql.functions import explode, col, to_timestamp

df_exploded = df.select(
    "store",
    to_timestamp(col("purchase_date"), "dd/MM/yyyy HH:mm:ss").alias("purchased_at"),
    explode("purchase").alias("purchase_item")
).select(
    col("purchase_item.Código").alias("Código"),
    col("purchase_item.Descrição").alias("Descrição"),
    col("purchase_item.Qtde").alias("Qtde"),
    col("purchase_item.Un").alias("Un"),
    col("purchase_item.`Vl Unit`").alias("Vl Unit"),
    col("purchase_item.`Vl Total`").alias("Vl Total"),
    "store",
    "purchased_at"
)

df_exploded.show(10, truncate=False)

+------+--------------------+-----+----+-------+--------+------------------------------+-------------------+
|Código|Descrição           |Qtde |Un  |Vl Unit|Vl Total|store                         |purchased_at       |
+------+--------------------+-----+----+-------+--------+------------------------------+-------------------+
|195   |BERINJELA KG        |0,605|KG  |7,98   |4,83    |RIGHI COM.DE GEN.ALIM.LTDA-F.1|2024-09-03 09:18:50|
|196   |CEBOLA KG           |0,775|KG  |4,45   |3,45    |RIGHI COM.DE GEN.ALIM.LTDA-F.1|2024-09-03 09:18:50|
|197   |CENOURA KG          |0,665|KG  |3,12   |2,07    |RIGHI COM.DE GEN.ALIM.LTDA-F.1|2024-09-03 09:18:50|
|223   |TOMATE LONGA VIDA KG|1,145|KG  |3,77   |4,32    |RIGHI COM.DE GEN.ALIM.LTDA-F.1|2024-09-03 09:18:50|
|257   |ABOBORA DE TRONCO KG|1,43 |KG  |12,6   |18,02   |RIGHI COM.DE GEN.ALIM.LTDA-F.1|2024-09-03 09:18:50|
|726   |BERGAMOTA MONTEN.KG |0,65 |KG  |5,9    |3,84    |RIGHI COM.DE GEN.ALIM.LTDA-F.1|2024-09-03 09:18:50|
|285   |Q.LANCHE FA

In [9]:
row_count = df_exploded.count()
print(f"Number of rows: {row_count}")

Number of rows: 4364


In [10]:
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import DoubleType

def clean_decimal_columns(df, column_names):
    """
    Replace ',' with '.' in specified string columns and cast them to DoubleType.

    Args:
        df (DataFrame): The Spark DataFrame.
        column_names (list): List of column names (strings) to clean.

    Returns:
        DataFrame: The transformed DataFrame with cleaned float columns.
    """
    for col_name in column_names:
        df = df.withColumn(
            col_name,
            regexp_replace(col(col_name), ",", ".").cast(DoubleType())
        )
    return df

In [11]:
columns_to_clean = ["Qtde", "Vl Unit", "Vl Total"]
df_cleaned = clean_decimal_columns(df_exploded, columns_to_clean)

df_cleaned.printSchema()
df_cleaned.show(5, truncate=False)

root
 |-- Código: string (nullable = true)
 |-- Descrição: string (nullable = true)
 |-- Qtde: double (nullable = true)
 |-- Un: string (nullable = true)
 |-- Vl Unit: double (nullable = true)
 |-- Vl Total: double (nullable = true)
 |-- store: string (nullable = true)
 |-- purchased_at: timestamp (nullable = true)

+------+--------------------+-----+---+-------+--------+------------------------------+-------------------+
|Código|Descrição           |Qtde |Un |Vl Unit|Vl Total|store                         |purchased_at       |
+------+--------------------+-----+---+-------+--------+------------------------------+-------------------+
|195   |BERINJELA KG        |0.605|KG |7.98   |4.83    |RIGHI COM.DE GEN.ALIM.LTDA-F.1|2024-09-03 09:18:50|
|196   |CEBOLA KG           |0.775|KG |4.45   |3.45    |RIGHI COM.DE GEN.ALIM.LTDA-F.1|2024-09-03 09:18:50|
|197   |CENOURA KG          |0.665|KG |3.12   |2.07    |RIGHI COM.DE GEN.ALIM.LTDA-F.1|2024-09-03 09:18:50|
|223   |TOMATE LONGA VIDA KG|1.145

In [12]:
def rename_columns_to_uppercase(df):
    """
    Rename columns to UPPER_SNAKE_CASE English equivalents.

    Args:
        df (DataFrame): Input DataFrame with original column names.

    Returns:
        DataFrame: DataFrame with renamed columns.
    """
    rename_map = {
        "Código": "CODE",
        "Descrição": "DESCRIPTION",
        "Qtde": "QUANTITY",
        "Un": "UNIT",
        "Vl Unit": "UNIT_PRICE",
        "Vl Total": "TOTAL_PRICE",
        "store": "STORE",
        "purchased_at": "PURCHASED_AT"
    }

    for original, renamed in rename_map.items():
        df = df.withColumnRenamed(original, renamed)
    return df

In [13]:
df_renamed = rename_columns_to_uppercase(df_cleaned)
df_renamed.printSchema()
df_renamed.show(10, truncate=False)

root
 |-- CODE: string (nullable = true)
 |-- DESCRIPTION: string (nullable = true)
 |-- QUANTITY: double (nullable = true)
 |-- UNIT: string (nullable = true)
 |-- UNIT_PRICE: double (nullable = true)
 |-- TOTAL_PRICE: double (nullable = true)
 |-- STORE: string (nullable = true)
 |-- PURCHASED_AT: timestamp (nullable = true)

+-----+--------------------+--------+----+----------+-----------+------------------------------+-------------------+
|CODE |DESCRIPTION         |QUANTITY|UNIT|UNIT_PRICE|TOTAL_PRICE|STORE                         |PURCHASED_AT       |
+-----+--------------------+--------+----+----------+-----------+------------------------------+-------------------+
|195  |BERINJELA KG        |0.605   |KG  |7.98      |4.83       |RIGHI COM.DE GEN.ALIM.LTDA-F.1|2024-09-03 09:18:50|
|196  |CEBOLA KG           |0.775   |KG  |4.45      |3.45       |RIGHI COM.DE GEN.ALIM.LTDA-F.1|2024-09-03 09:18:50|
|197  |CENOURA KG          |0.665   |KG  |3.12      |2.07       |RIGHI COM.DE GEN.ALI

In [14]:
df_description = df_renamed.select("DESCRIPTION")



In [15]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

# Define keyword lists (all lowercase) for categorization
vegetables = ['berinjela', 'cebola', 'cenoura', 'tomate', 'abobora', 'pepino', 'rucula', 'batata', 'alface', 'brocolis', 'repolho', 'beterraba', 'mandioquinha']
meats = ['bife', 'file', 'coxinha', 'chuleta', 'bacon', 'patinho', 'peito', 'frango', 'carne', 'coxa', 'linguiça', 'guisado', 'atum']
dairy = ['leite', 'iogurte', 'queijo', 'mussarela', 'nata', 'margarina', 'manteiga', 'q.lanche']
beverages = ['coca-cola', 'agua', 'suco', 'cha', 'energi', 'monster', 'cafe', 'nescafe', 'vinho', 'cerveja', 'guarana','dolce gusto']
seasonings = ['tempero', 'molho', 'sazon', 'mostarda', 'catchup', 'sal', 'oregano', 'paprica', 'chimichurri', 'cominho', 'coentro', 'maionese', 'ext.elefante', 'vinagre']
grains = ['arroz', 'feijao', 'farinha', 'massa', 'pao', 'bolo', 'tapioca', 'milho', 'lentilha', 'aveia', 'grao', 'far.maria', 'feij.pto']
snacks = ['biscoito', 'chocolate', 'snickers', 'bombom', 'paodequeijo', 'gelatina', 'dulce', 'doce', 'barra', 'cookie', 'pipoca']
fruits = ['banana', 'laranja', 'maca', 'abacaxi', 'pera', 'uva', 'mamão', 'goiaba', 'manga', 'kiwi', 'ameixa', 'bergamota', 'tangerina', 'caqui', 'caju', 'morango']

non_food_keywords = ['amac.downy', 'esponja', 'papel hig', 'det.liq', 'sab liq', 'limpador', 'colgate', 'vela', 'toalha', 'algodao', 'abs', 'antartica', 'isquiero', 'suporte', 'coador', 'pinça', 'lixa', 'alicate', 'cortador', 'perfume', 'desodorante', 'sabonete', 'pente', 'escova', 'gel', 'repelente', 'pasta', 'creme', 'shampoo', 'condicionador']

def categorize(description):
    if description is None:
        return ("No", "Non-food")
    
    desc = description.lower()
    
    if any(k in desc for k in non_food_keywords):
        return ("No", "Non-food")
    if any(k in desc for k in vegetables):
        return ("Yes", "Vegetable")
    if any(k in desc for k in meats):
        return ("Yes", "Meat")
    if any(k in desc for k in dairy):
        return ("Yes", "Dairy")
    if any(k in desc for k in beverages):
        return ("Yes", "Beverage")
    if any(k in desc for k in seasonings):
        return ("Yes", "Seasoning")
    if any(k in desc for k in grains):
        return ("Yes", "Grain")
    if any(k in desc for k in snacks):
        return ("Yes", "Snack")
    if any(k in desc for k in fruits):
        return ("Yes", "Fruit")
    
    return ("No", "Non-food")

# Register UDF for Spark
categorize_udf = F.udf(categorize, returnType=F.StructType().add("IS_FOOD", StringType()).add("FOOD_TYPE", StringType()))

# Apply UDF and expand struct to columns
df_categorized = df_description.withColumn("category", categorize_udf(F.col("DESCRIPTION"))) \
    .withColumn("IS_FOOD", F.col("category.IS_FOOD")) \
    .withColumn("FOOD_TYPE", F.col("category.FOOD_TYPE")) \
    .drop("category")

# Show sample
df_categorized.show(10, truncate=False)

# Filter unknown items for logging (not food)
df_unknown = df_categorized.filter((F.col("IS_FOOD") == "No") & (F.col("FOOD_TYPE") == "Non-food"))

# Collect unknown descriptions to driver and save log file
unknown_list = df_unknown.select("DESCRIPTION").rdd.flatMap(lambda x: x).collect()




+--------------------+-------+---------+
|DESCRIPTION         |IS_FOOD|FOOD_TYPE|
+--------------------+-------+---------+
|BERINJELA KG        |Yes    |Vegetable|
|CEBOLA KG           |Yes    |Vegetable|
|CENOURA KG          |Yes    |Vegetable|
|TOMATE LONGA VIDA KG|Yes    |Vegetable|
|ABOBORA DE TRONCO KG|Yes    |Vegetable|
|BERGAMOTA MONTEN.KG |Yes    |Fruit    |
|Q.LANCHE FAT KG     |Yes    |Dairy    |
|GUISADO DE PRIMEIRA |Yes    |Meat     |
|AMAC.DOWNY 1.5L     |No     |Non-food |
|FERM.ROYAL PO 250G  |No     |Non-food |
+--------------------+-------+---------+
only showing top 10 rows



In [16]:
# Save the categorized DataFrame as CSV

df_categorized.coalesce(1).write.mode("overwrite").option("header", True).csv(OUTPUT_DATA_PATH)
print(f"Categorized CSV saved to: {OUTPUT_DATA_PATH}")

Categorized CSV saved to: /home/jovyan/data/gold/spark_output
