In [1]:
import pandas as pd
from deltalake import DeltaTable
import pyarrow as pa
from pyarrow import dataset as ds
import deltalake

# Read the CSV file
df = pd.read_csv('Sample-Superstore.csv', encoding='latin-1')

# Convert pandas DataFrame to PyArrow Table
table = pa.Table.from_pandas(df)

# Define the MinIO storage options
storage_options = {
    "aws_access_key_id": "minio",      # default MinIO access key
    "aws_secret_access_key": "minio123",  # default MinIO secret key
    "aws_endpoint_url": "http://localhost:9000",  # MinIO endpoint URL
    "aws_allow_http": "true",          # Allow HTTP connections
    "fs.s3a.path.style.access": "true" # Use path-style access for MinIO
}

# Define the table path in MinIO - using s3a protocol
table_path = "s3a://datalake-abc/bronze/superstore"

# Create and write to Delta table
deltalake.write_deltalake(
    table_path,
    table,
    storage_options=storage_options,
    mode='overwrite'
)

print("Data successfully written to Delta Lake table in MinIO")

Data successfully written to Delta Lake table in MinIO


In [None]:
import trino
import pandas as pd

# Create a connection to Trino
conn = trino.dbapi.connect(
    host='localhost',
    port=8080,
    user='trino',
    catalog='abc',
    schema='bronze'
)

# Create a cursor
cursor = conn.cursor()

# Execute the query
query = "SELECT * FROM abc.bronze.superstore"
cursor.execute(query)

# Fetch column names
columns = [desc[0] for desc in cursor.description]

# Fetch all rows and create DataFrame
rows = cursor.fetchall()
df_trino = pd.DataFrame(rows, columns=columns)

# Close cursor and connection
cursor.close()
conn.close()

print("Data successfully loaded from Trino")
print(f"Number of rows: {len(df_trino)}")
df_trino.head()

In [None]:
import trino
import pandas as pd
# Drop the table if it exists
conn = trino.dbapi.connect(
    host='localhost',
    port=8080,
    user='trino',
    catalog='abc',
    schema='csmsales'
)

cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS abc.csmsales.florida_superstores")
cursor.close()

# Create the table with the desired name
create_table_query = """
CREATE TABLE abc.silver.florida_superstores AS
SELECT * FROM abc.bronze.superstore
WHERE state = 'Florida'
"""

cursor = conn.cursor()
cursor.execute(create_table_query)
cursor.close()

print("Florida data successfully written to silver schema with correct table name")

In [None]:
# Create a cursor
cursor = conn.cursor()

# Execute the query
query = "SELECT * FROM abc.silver.florida_superstores"
cursor.execute(query)

# Fetch column names
columns = [desc[0] for desc in cursor.description]

# Fetch all rows and create DataFrame
rows = cursor.fetchall()
df_trino_silver = pd.DataFrame(rows, columns=columns)

# Close cursor and connection
cursor.close()
conn.close()

print("Data successfully loaded from Trino")
print(f"Number of rows: {len(df_trino_silver)}")
df_trino_silver.head()

In [None]:
# Insert rows from bronze (California) into the silver table
# Uses existing `conn` (trino.dbapi.Connection)
cursor = conn.cursor()

# count before
cursor.execute("SELECT count(*) FROM abc.silver.florida_superstores")
before = cursor.fetchone()[0]

# insert California rows from bronze.superstore
insert_sql = """
INSERT INTO abc.bronze.florida_superstores
SELECT * FROM abc.silver.superstore
WHERE state = 'California'
"""
cursor.execute(insert_sql)

# count after
cursor.execute("SELECT count(*) FROM abc.silver.florida_superstores")
after = cursor.fetchone()[0]

cursor.close()

print(f"Inserted {after - before} rows (before: {before}, after: {after})")

In [None]:
conn = trino.dbapi.connect(
    host='localhost',
    port=8080,
    user='trino',
    catalog='abc',
    schema='silver'
)
# Create a cursor
cursor = conn.cursor()

# Time travel query
time_travel_query = """
SELECT *
FROM abc.silver.florida_superstores
"""

cursor.execute(time_travel_query)

# Fetch column names and data
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
df_timetravel = pd.DataFrame(rows, columns=columns)

# Close cursor
cursor.close()

print(f"Data as of 2025-10-23 19:31:00")
print(f"Number of rows: {len(df_timetravel)}")
# df_timetravel.head()

In [None]:
import requests
import json
import pandas as pd

# Make API request
api_url = "https://kwportalservice.csmpl.com/OdooSynSVC.svc/GetComponewiseBillingStatusDetails"
response = requests.get(api_url)
data = response.json()

# Convert API data to DataFrame
df_api = pd.DataFrame(data)

# Create a cursor using existing connection
cursor = conn.cursor()

# Create table DDL based on DataFrame schema
create_table_sql = """
CREATE TABLE IF NOT EXISTS abc.csmsales.billing_status (
    """ + ",\n    ".join([f"{col} VARCHAR" for col in df_api.columns]) + """
)
"""

cursor.execute(create_table_sql)

# Insert data
values = [tuple(row) for row in df_api.values]
placeholders = ",".join(["?" for _ in df_api.columns])
insert_sql = f"INSERT INTO abc.csmsales.billing_status VALUES ({placeholders})"

cursor.executemany(insert_sql, values)

# Close cursor
cursor.close()

print("API data successfully loaded into Trino table")

In [None]:
import requests
import pandas as pd
import trino

conn = trino.dbapi.connect(
    host='datalake.csmpl.com',
    port=443,
    user='trino',
    catalog='delta',
    schema='test_schema'
)
# Fetch API data
api_url = "https://kwportalservice.csmpl.com/OdooSynSVC.svc/GetComponewiseBillingStatusDetails"
data = requests.get(api_url).json()
df_api = pd.DataFrame(data)

# Create Trino cursor
cursor = conn.cursor()

# Create table if not exists
cols = ",\n    ".join([f"{col} VARCHAR" for col in df_api.columns])
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS delta.test_schema.billing_status (
    {cols}
)
""")

# Build one bulk insert query
values_list = []
for _, row in df_api.iterrows():
    escaped_values = []
    for v in row:
        # Convert None to NULL and escape single quotes
        if pd.isna(v):
            escaped_values.append("NULL")
        else:
            escaped_values.append("'" + str(v).replace("'", "''") + "'")
    values_list.append("(" + ",".join(escaped_values) + ")")

insert_sql = f"""
INSERT INTO delta.test_schema.billing_status VALUES {",".join(values_list)}
"""
cursor.execute(insert_sql)

cursor.close()
print("âœ… Data loaded in a single version.")