In [1]:
from pyiceberg.catalog import load_catalog
from pyiceberg.exceptions import TableAlreadyExistsError
from pyiceberg.schema import Schema
from pyiceberg.types import StringType, DoubleType, NestedField
import pandas as pd
from IPython.display import display
import pyarrow as pa
from minio import Minio

In [2]:
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

# Access the variables
MINIO_URL = os.getenv("MINIO_URL")
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY")
RAW_CATALOG_URL = os.getenv("RAW_CATALOG_URL")
RAW_CATALOG_PORT = os.getenv("RAW_CATALOG_PORT")
RAW_CATALOG_NAME = os.getenv("RAW_CATALOG_NAME")
TABLE_CATALOG_URL = os.getenv("TABLE_CATALOG_URL")
TABLE_CATALOG_PORT = os.getenv("TABLE_CATALOG_PORT")
TABLE_CATALOG_NAME = os.getenv("TABLE_CATALOG_NAME")

In [12]:
# Charger le catalogue Iceberg RAW
catalog_raw = load_catalog('RAW', **{
    'uri': f"{RAW_CATALOG_URL}:{RAW_CATALOG_PORT}",
    's3.endpoint': MINIO_URL,
    's3.access-key-id': MINIO_ACCESS_KEY,
    's3.secret-access-key': MINIO_SECRET_KEY,
})
catalog_table = load_catalog('TABLE', **{
    'uri': f"{TABLE_CATALOG_URL}:{TABLE_CATALOG_PORT}",
    's3.endpoint': MINIO_URL,
    's3.access-key-id': MINIO_ACCESS_KEY,
    's3.secret-access-key': MINIO_SECRET_KEY,
})

In [4]:
minio_client = Minio(
    MINIO_ENDPOINT,
    access_key=MINIO_ACCESS_KEY,
    secret_key=MINIO_SECRET_KEY,
    secure=False
)

In [None]:
for obj in minio_client.list_objects("landing"):
    print(obj.object_name)

In [19]:
data = minio_client.get_object(
        bucket_name="landing",
        object_name="Data/employees.csv",
    )

In [20]:
data = pd.read_csv(data)

In [29]:
data.head()

Unnamed: 0,EmployeeID,FirstName,LastName,Department,Position,Email
0,1,Edward,Thompson,Legal,Paralegal,emcbride@example.org
1,2,Courtney,Lopez,Product,Product Analyst,jtran@example.net
2,3,Douglas,Ayala,Human Resources,Recruiter,skline@example.net
3,4,Shannon,Roberts,Legal,Paralegal,fordangela@example.org
4,5,Andrew,Sanchez,Operations,Operations Analyst,allen79@example.com


In [3]:
# Create the schema for the table
schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
)

table_name = "my_namespace.cities_1"  # Specify your table name

try: 
    # Create the table
    my_table = catalog_raw.create_table(
        identifier=table_name,
        schema=schema,
    )
except TableAlreadyExistsError:
    print(f"Table {table_name} already exists in the catalog")

Table my_namespace.cities_1 already exists in the catalog


In [None]:
df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
        {"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
        {"city": "Drachten", "lat": 53.11254, "long": 6.0989},
        {"city": "Paris", "lat": 48.864716, "long": 2.349014},
    ],
)

In [None]:
table = catalog_raw.load_table("my_namespace.cities_1")
table.overwrite(df)

In [4]:
scan = catalog_raw.load_table("my_namespace.cities_1").scan()
pandas_df = scan.to_pandas()
df = pd.DataFrame(pandas_df)
display(df)

Unnamed: 0,city,lat,long
0,Amsterdam,52.371807,4.896029
1,San Francisco,37.773972,-122.431297
2,Drachten,53.11254,6.0989
3,Paris,48.864716,2.349014
