In [5]:
# Librerías generales
#=========================
import json
from json import dumps
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, explode, from_json, col, current_date
from pyspark.sql.types import StringType, StructType, StructField, ArrayType
import datetime 
import requests
import urllib3
from datetime import datetime,date
import pytz
import io
import glob
import os
import boto3

In [6]:
CATALOG_URI = "http://nessie:19120/api/v1"  # Nessie Server URI
WAREHOUSE = "s3://gold/"               # Minio Address to Write to
STORAGE_URI = "http://172.20.0.5:9000"      # Minio IP address from docker inspect
AWS_ACCESS_KEY='admin'
AWS_SECRET_KEY='password'


In [7]:
# Configuración de Minio
minio_client = boto3.client(
    's3',
    #endpoint_url='http://172.18.0.4:9000',  # Usar el nombre del servicio de Docker
    endpoint_url='http://minio:9000',
    aws_access_key_id='admin',
    aws_secret_access_key='password',
    region_name='us-east-1'
)

In [8]:
from pyspark.sql import SparkSession
import pyspark

# Configuración combinada
conf = (
    pyspark.SparkConf()
        .setAppName('combined_spark_app')
        # Paquetes para PostgreSQL, Iceberg, Nessie, AWS SDK y Hadoop AWS
        .set('spark.jars.packages', 'org.postgresql:postgresql:42.7.3,'
                                    'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,'
                                    'org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.77.1,'
                                    'software.amazon.awssdk:bundle:2.24.8,'
                                    'software.amazon.awssdk:url-connection-client:2.24.8,'
                                    'org.apache.hadoop:hadoop-aws:3.2.0,'
                                    'com.amazonaws:aws-java-sdk-bundle:1.11.534')
        # Extensiones de Iceberg y Nessie
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,'
                                     'org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
        # Configuración de Nessie como catálogo Iceberg
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', CATALOG_URI)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        # Configuración para almacenamiento en S3 (MinIO)
        .set('spark.sql.catalog.nessie.s3.endpoint', STORAGE_URI)
        .set('spark.sql.catalog.nessie.warehouse', WAREHOUSE)
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
        # Configuración para acceso a S3 directamente desde Spark (sin Nessie)
        .set('spark.hadoop.fs.s3a.endpoint', STORAGE_URI)
        .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_KEY)
        .set('spark.hadoop.fs.s3a.path.style.access', 'true')
         .set("spark.hadoop.fs.s3a.aws.credentials.provider",
             "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
        
)



In [9]:
# Creación de la sesión de Spark
spark = SparkSession.builder \
    .config(conf=conf) \
    .getOrCreate()
print("Spark Session Started")

Spark Session Started


In [10]:
df_taxis = spark.read.parquet("s3a://bronze/yellow_tripdata_2025-01.parquet")


In [11]:
df_taxis.printSchema()



root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [12]:
df_taxis.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|         237|           1|       10.0|  3.5|    0.5|       3.

In [13]:
display(df_taxis)

DataFrame[VendorID: int, tpep_pickup_datetime: timestamp_ntz, tpep_dropoff_datetime: timestamp_ntz, passenger_count: bigint, trip_distance: double, RatecodeID: bigint, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, Airport_fee: double]

In [14]:
pdf = df_taxis.limit(10).toPandas()
display(pdf)  

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,1,2025-01-01 00:18:38,2025-01-01 00:26:59,1,1.6,1,N,229,237,1,10.0,3.5,0.5,3.0,0.0,1.0,18.0,2.5,0.0
1,1,2025-01-01 00:32:40,2025-01-01 00:35:13,1,0.5,1,N,236,237,1,5.1,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0
2,1,2025-01-01 00:44:04,2025-01-01 00:46:01,1,0.6,1,N,141,141,1,5.1,3.5,0.5,2.0,0.0,1.0,12.1,2.5,0.0
3,2,2025-01-01 00:14:27,2025-01-01 00:20:01,3,0.52,1,N,244,244,2,7.2,1.0,0.5,0.0,0.0,1.0,9.7,0.0,0.0
4,2,2025-01-01 00:21:34,2025-01-01 00:25:06,3,0.66,1,N,244,116,2,5.8,1.0,0.5,0.0,0.0,1.0,8.3,0.0,0.0
5,2,2025-01-01 00:48:24,2025-01-01 01:08:26,2,2.63,1,N,239,68,2,19.1,1.0,0.5,0.0,0.0,1.0,24.1,2.5,0.0
6,1,2025-01-01 00:14:47,2025-01-01 00:16:15,0,0.4,1,N,170,170,1,4.4,3.5,0.5,2.35,0.0,1.0,11.75,2.5,0.0
7,1,2025-01-01 00:39:27,2025-01-01 00:51:51,0,1.6,1,N,234,148,1,12.1,3.5,0.5,2.0,0.0,1.0,19.1,2.5,0.0
8,1,2025-01-01 00:53:43,2025-01-01 01:13:23,0,2.8,1,N,148,170,1,19.1,3.5,0.5,3.0,0.0,1.0,27.1,2.5,0.0
9,2,2025-01-01 00:00:02,2025-01-01 00:09:36,1,1.71,1,N,237,262,2,11.4,1.0,0.5,0.0,0.0,1.0,16.4,2.5,0.0


In [56]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.gold;").show()

++
||
++
++



In [15]:
df_taxis.writeTo("nessie.gold.yellowtrip").createOrReplace()


In [49]:
spark.stop()