# Trino Demo: Spark Data Preparation

This notebook prepares sample data for the Trino federated analytics demo. We'll create Hive tables on MinIO that can be queried through Trino.

In [1]:
import findspark
findspark.init()

In [2]:
import ast
import atexit
import io
import itertools
import json
import os
import re
import sys
import uuid
from datetime import datetime

from IPython.display import HTML, display

if "spark" not in vars():
    import findspark

    findspark.init()
    import pyspark.sql.functions as F
    import pyspark.sql.types as T
    from pyspark.sql import DataFrame, SparkSession
    from pyspark.sql.window import Window
    from pyspark.storagelevel import StorageLevel

    spark = (
        SparkSession.builder.master("local[8,2]")
        .config("spark.driver.memory", "3g")
        .enableHiveSupport()
        .getOrCreate()
    )
    atexit.register(lambda: spark.stop())

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.window import Window


def cleanse_val(val):
    return re.sub(r"([^\s\w\d])+", "", val.lower()).strip() if val else ""


def reg(spark_df, name=None):
    uniqsig = "df_{0}".format(cleanse_val(str(uuid.uuid4()))) if not name else name
    spark_df.createOrReplaceTempView(uniqsig)
    return uniqsig


def show(df, rows=5):
    display(df.limit(rows).toPandas())


# Override table show/registration functions
DataFrame.reg = reg
DataFrame.dshow = show

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/11 19:46:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/11 19:46:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Load and Prepare Demo Data

We'll load CSV files and create Hive tables that will be accessible through Trino's MinIO catalog.

In [3]:
import glob
import os

from pyspark.sql.functions import *
from pyspark.sql.types import *

# Load CSV data files
data_path = f"{os.getcwd()}/data"

# Customers table
customers_df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"file://{data_path}/customers.csv")
)

# Orders table
orders_df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"file://{data_path}/orders.csv")
)

# Products table
products_df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"file://{data_path}/products.csv")
)

print("Data loaded successfully:")
print(f"Customers: {customers_df.count()} rows")
print(f"Orders: {orders_df.count()} rows")
print(f"Products: {products_df.count()} rows")

25/12/11 19:46:42 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


Data loaded successfully:
Customers: 10 rows
Orders: 15 rows
Products: 10 rows


## Create Hive Tables

Now we'll create Hive tables using the data. These tables will be stored in MinIO and accessible through Trino.

In [4]:
# Create Hive tables with proper schemas

# Customers table
customers_df.write.mode("overwrite").saveAsTable("default.customers")

# Orders table
orders_df.write.mode("overwrite").saveAsTable("default.orders")

# Products table
products_df.write.mode("overwrite").saveAsTable("default.products")

print("Hive tables created successfully in MinIO!")

25/12/11 19:46:48 WARN Base64: JAXB is unavailable. Will fallback to SDK implementation which may be less performant.If you are using Java 9+, you will need to include javax.xml.bind:jaxb-api as a dependency.
25/12/11 19:46:49 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


Hive tables created successfully in MinIO!


## Verify Tables

Let's verify the tables were created and can be queried.

In [7]:
# Verify tables exist
spark.sql("SHOW TABLES IN default").dshow()

# Sample data from each table
print("\nSample customers:")
spark.sql("SELECT * FROM default.customers LIMIT 5").dshow()

print("\nSample orders:")
spark.sql("SELECT * FROM default.orders LIMIT 5").dshow()

print("\nSample products:")
spark.sql("SELECT * FROM default.products LIMIT 5").dshow()

Unnamed: 0,namespace,tableName,isTemporary
0,default,customers,False
1,default,orders,False
2,default,products,False



Sample customers:


Unnamed: 0,customer_id,customer_name,country,segment
0,1,Acme Corp,USA,Enterprise
1,2,Globex,USA,SMB
2,3,Initech,Canada,Midmarket
3,4,Stark Industries,USA,Enterprise
4,5,Wayne Enterprises,UK,Enterprise



Sample orders:


Unnamed: 0,order_id,customer_id,order_ts,symbol,quantity,price_usd
0,101,1,2025-01-01 10:01:00,MSFT,100,310.5
1,102,2,2025-01-01 10:02:00,AAPL,50,215.1
2,103,1,2025-01-01 10:03:00,AMZN,20,140.0
3,104,3,2025-01-01 10:05:00,GOOG,10,2900.75
4,105,4,2025-01-01 10:06:30,MSFT,150,311.25



Sample products:


Unnamed: 0,symbol,sector,industry
0,MSFT,Technology,Software
1,AAPL,Technology,Hardware
2,AMZN,Consumer,Internet Retail
3,GOOG,Technology,Internet Services
4,SNOW,Technology,Data Cloud


## Summary

The data is now prepared and ready for the Trino federated analytics demo. The tables are accessible through:

- **Trino**: `minio.default.customers`, `minio.default.orders`, `minio.default.products`
- **MinIO Console**: http://localhost:9001 (minioadmin/minioadmin)

Next steps:
1. Run the KafkaMockStream notebook to generate real-time data
2. Run the TrinoFederatedDemo notebook to showcase cross-system analytics

## Federated Query Example

Query across PostgreSQL (customers) and MinIO (orders) to demonstrate Trino's federated capabilities.

In [17]:
%%sql federated_results <<
SELECT 
    c.customer_id,
    c.customer_name,
    c.country,
    c.segment,
    COUNT(o.order_id) AS order_count,
    SUM(o.quantity * o.price_usd) AS total_spent
FROM postgresdb.public.customers c
LEFT JOIN minio.default.orders o 
       ON c.customer_id = o.customer_id
GROUP BY 
    c.customer_id,
    c.customer_name,
    c.country,
    c.segment
ORDER BY total_spent DESC

 * trino://admin@trino:8080/minio?protocol=http
Done.
Returning data to local variable federated_results


In [18]:
# Display federated query results
federated_df = federated_results.DataFrame()
federated_df

Unnamed: 0,customer_id,customer_name,country,segment,order_count,total_spent
0,1,Acme Corp,USA,Enterprise,3,71500.0
1,6,Hooli,USA,SMB,1,62400.0
2,4,Stark Industries,USA,Enterprise,2,50412.5
3,10,Veidan Industries,Israel,Enterprise,1,43882.5
4,3,Initech,Canada,Midmarket,2,32582.5
5,2,Globex,USA,SMB,2,25305.0
6,9,Oscorp,USA,SMB,1,21725.0
7,5,Wayne Enterprises,UK,Enterprise,1,16237.5
8,8,Cyberdyne Systems,USA,Midmarket,1,4643.75
9,7,Umbrella Corporation,Japan,Enterprise,1,4275.0
