In [2]:
# Configuracion de spark

import pyspark

from pyspark.sql import SparkSession, DataFrameWriter, functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import col, get_json_object

app_name = "Avance 2"

spark = SparkSession.builder \
    .appName(app_name) \
    .config("spark.jars", "/spark/jars/postgresql.jar") \
    .getOrCreate()

app_name = spark.conf.get("spark.app.name")
spark_context = spark.sparkContext.master


In [3]:
# Configuracion de las variables de entorno

url = "jdbc:postgresql://postgres:5432/metabase"
user = "postgres"
password = "password"

In [4]:
# Funcion para insertar los datos

def insert_data_to_table(data_to_insert, local_table):
    default_options = {
        "url": url,
        "user": user,
        "password": password,
        "dbtable": local_table
    }

    try:
        data_to_insert.write \
            .format("jdbc") \
            .options(**default_options) \
            .option("ignoreInserts", "true") \
            .option("driver", "org.postgresql.Driver") \
            .mode("append") \
            .save()

        local_table_saved = spark.read \
            .format("jdbc") \
            .options(**default_options) \
            .load()

        # Obtener el ID de la última inserción
        last_insert_id = local_table_saved.selectExpr("max(id)").first()[0]
    except Exception as e:
        print(f"Error al insertar datos en la tabla {local_table}")

        last_insert_id = None

    return last_insert_id


In [5]:
# Funcion parar crear los schemas en postgres

def insert_data_to_table(data_to_insert, new_table):
    default_options = {
        "url": url,
        "user": user,
        "password": password,
        "dbtable": new_table
    }

    try:
        data_to_insert.write \
            .format("jdbc") \
            .options(**default_options) \
            .option("ignoreInserts", "true") \
            .option("driver", "org.postgresql.Driver") \
            .mode("overwrite") \
            .save()

    except Exception as e:
        print(f"Error al crear la tabla {new_table}, {e}")

    return data_to_insert.show()


In [9]:
# Crear y guardar schema de products en postrgres

product_schema = StructType([
    StructField("product_id", IntegerType(), nullable=False),
    StructField("product_name", StringType(), nullable=False),
    StructField("aisle_id", IntegerType(), nullable=True),
    StructField("department_id", IntegerType(), nullable=True)
])

df_products_empty = spark.createDataFrame([], schema=product_schema)
insert_data_to_table(df_products_empty, "products")

                                                                                

+----------+------------+--------+-------------+
|product_id|product_name|aisle_id|department_id|
+----------+------------+--------+-------------+
+----------+------------+--------+-------------+



In [7]:
# creacion y lectura del archivo

archivo_csv = '/app/data/products.csv'

data_frame_products = spark.read.csv(archivo_csv, header=True, inferSchema=True)

print(data_frame_products.show())

+----------+--------------------+--------+-------------+
|product_id|        product_name|aisle_id|department_id|
+----------+--------------------+--------+-------------+
|         1|Chocolate Sandwic...|      61|           19|
|         2|    All-Seasons Salt|     104|           13|
|         3|Robust Golden Uns...|      94|            7|
|         4|Smart Ones Classi...|      38|            1|
|         5|Green Chile Anyti...|       5|           13|
|         6|        Dry Nose Oil|      11|           11|
|         7|Pure Coconut Wate...|      98|            7|
|         8|Cut Russet Potato...|     116|            1|
|         9|Light Strawberry ...|     120|           16|
|        10|Sparkling Orange ...|     115|            7|
|        11|   Peach Mango Juice|      31|            7|
|        12|Chocolate Fudge L...|     119|            1|
|        13|   Saline Nasal Mist|      11|           11|
|        14|Fresh Scent Dishw...|      74|           17|
|        15|Overnight Diapers..

In [9]:
insert_data_to_table(data_frame_products, 'products')

+----------+--------------------+--------+-------------+
|product_id|        product_name|aisle_id|department_id|
+----------+--------------------+--------+-------------+
|         1|Chocolate Sandwic...|      61|           19|
|         2|    All-Seasons Salt|     104|           13|
|         3|Robust Golden Uns...|      94|            7|
|         4|Smart Ones Classi...|      38|            1|
|         5|Green Chile Anyti...|       5|           13|
|         6|        Dry Nose Oil|      11|           11|
|         7|Pure Coconut Wate...|      98|            7|
|         8|Cut Russet Potato...|     116|            1|
|         9|Light Strawberry ...|     120|           16|
|        10|Sparkling Orange ...|     115|            7|
|        11|   Peach Mango Juice|      31|            7|
|        12|Chocolate Fudge L...|     119|            1|
|        13|   Saline Nasal Mist|      11|           11|
|        14|Fresh Scent Dishw...|      74|           17|
|        15|Overnight Diapers..

                                                                                

In [10]:
# creacion y lectura del archivo y guardado de aisles

aisles_csv = "/app/data/aisles.csv"
aisles_data_frame = spark.read.csv(aisles_csv, header=True, inferSchema=True)

aisles_schema = StructType([
    StructField("aisle_id", IntegerType(), nullable=False),
    StructField("aisle", StringType(), nullable=False),
])
aisles_df_empty = spark.createDataFrame([], schema=aisles_schema)

insert_data_to_table(aisles_df_empty, "aisles")
insert_data_to_table(aisles_data_frame, 'aisles')


+---+-----+
| id|aisle|
+---+-----+
+---+-----+

+--------+--------------------+
|aisle_id|               aisle|
+--------+--------------------+
|       1|prepared soups sa...|
|       2|   specialty cheeses|
|       3| energy granola bars|
|       4|       instant foods|
|       5|marinades meat pr...|
|       6|               other|
|       7|       packaged meat|
|       8|     bakery desserts|
|       9|         pasta sauce|
|      10|    kitchen supplies|
|      11|    cold flu allergy|
|      12|         fresh pasta|
|      13|      prepared meals|
|      14|tofu meat alterna...|
|      15|    packaged seafood|
|      16|         fresh herbs|
|      17|  baking ingredients|
|      18|bulk dried fruits...|
|      19|       oils vinegars|
|      20|        oral hygiene|
+--------+--------------------+
only showing top 20 rows



In [12]:
# creacion y lectura del archivo y guardado de aisles

departments_csv = '/app/data/departments.csv'
departments_data_frame = spark.read.csv(departments_csv, header=True, inferSchema=True)

departments_schema = StructType([
    StructField("department_id", IntegerType(), nullable=False),
    StructField("department", StringType(), nullable=False),
])
departments_df_empty = spark.createDataFrame([], schema=departments_schema)

insert_data_to_table(departments_df_empty, "departments")
insert_data_to_table(departments_data_frame, '')


+-------------+----------+
|department_id|department|
+-------------+----------+
+-------------+----------+

+-------------+---------------+
|department_id|     department|
+-------------+---------------+
|            1|         frozen|
|            2|          other|
|            3|         bakery|
|            4|        produce|
|            5|        alcohol|
|            6|  international|
|            7|      beverages|
|            8|           pets|
|            9|dry goods pasta|
|           10|           bulk|
|           11|  personal care|
|           12|   meat seafood|
|           13|         pantry|
|           14|      breakfast|
|           15|   canned goods|
|           16|     dairy eggs|
|           17|      household|
|           18|         babies|
|           19|         snacks|
|           20|           deli|
+-------------+---------------+
only showing top 20 rows

