# ModelOps System Prototyping Playground

## Use case

The dataset used in this example setup is based on public data that could consiste of streaming information of flight updates, together with other tabular data that could provide metadata on the sources or their measurements.

The data in this example is organized in an EMR cluster with Trino, Hive metastore and Apache iceberg deployed, and another node with PostgreSQL deployed.

The Hive cluster stores a table named `flights`, which contains the flight delay data. The PostgreSQL node stores two tables named `airlines` and `airports`, which provide additional information about airlines and airports for the `flights` table. 

![image.png](https://trino.io/assets/blog/intro-to-hive-connector/trino.png)


## Basic concepts

The Data Sources section in the [documentation](https://trino.io/docs/current/overview/concepts.html#data-sources) provides background on some basic concepts of Trino.

After you read it, you should know:
* The meanning of `connector`, `catalog`, `schema`, and `table`.
* Every catalog is associated with a specific connector. For example, the `hive` catalog is associated with an Hive connector.
* The fully-qualified table name is `<catalog>.<schema>.<table>`.

## Setup
Now, let us setup the Trino connection and test whether everything works well.
### Step 1. Install pyhive and ipython-sql
[ipython-sql](https://github.com/catherinedevlin/ipython-sql) is a jupyter notebook extension. It allows you to use SQL queries nicely inside jupyter notebooks. To use it, we also need the trino driver from [pyhive](https://github.com/dropbox/PyHive). You can easily install them using pip or Anaconda.
- Open a terminal and type `pip install pyhive ipython-sql` or `conda install -c conda-forge pyhive ipython-sql`.

### Step 2. Download Connection Config

Download the config file [trino.ini](https://coursys.sfu.ca/2023sp-cmpt-733-g1/pages/Presto_Config) and put it in the `conf` folder.

### Step 2. Connect to Trino
Now we connect to Trino. We first load the `ipython-sql` module and the `get_connection_string` function from `load_config.py` file. Run the cell below:

In [1]:
from load_config import get_connection_string
%load_ext sql

In [68]:
import trino

conn = trino.dbapi.connect(
    host='cs-galapagos.cmpt.sfu.ca',
    port=443,
    user='hadoop',
    # password=''
    # catalog='mycatalog',
    # schema='myschema'
)


In [8]:
print(get_connection_string("conf/trino.ini"))
print("postgresql://username:password@hostname/dbname")

postgresql://hadoop:MjU5MjUzYWFhNmIxODJjNTQyNzBiY2Fm@ec2-54-82-165-18.compute-1.amazonaws.com:5432/hive
postgresql://username:password@hostname/dbname


Next, we load the config file and get the connection string, then config for `ipython-sql`. Note that we set autolimit to 200, which means that the result fetchs at most 200 rows.

In [69]:
%reload_ext sql

  self.set(obj, value)


In [None]:
# The connection configuration is read from a separate file, not in this repo.
cstr = get_connection_string("conf/trino.ini")

%sql $cstr


'Connected: hadoop@minio/default'

In [281]:
%config SqlMagic.autocommit=False
%config SqlMagic.displaycon = False
%config SqlMagic.autolimit = 200

  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,


### Step 3. Test Connection
We're ready to issue queries. The cell below shows how to display catalogs in Trino. Run it and see whether everything works:

In [282]:
%sql show catalogs

   trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/hive/default
 * trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/minio/default
   trino://hadoop@localhost:8080/hive/default
   trino://hadoop@localhost:9090/hive/default
   trino://hadoop@localhost:9091/hive/default
Done.


Catalog
hive
iceberg
system
tpcds
tpch


You should see `hive` and `postgresql` in the catalogs. They represet the Hive and PostgreSQL connectors, respectively. Now, we list the schemas in Hive using `SHOW SCHEMAS FROM <CATALOG>`:

In [283]:
%sql SHOW SCHEMAS FROM hive

   trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/hive/default
 * trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/minio/default
   trino://hadoop@localhost:8080/hive/default
   trino://hadoop@localhost:9090/hive/default
   trino://hadoop@localhost:9091/hive/default
Done.


Schema
default
information_schema


You should see the `cmpt733` schema. We list its table:

In [284]:
%sql SHOW TABLES FROM hive.default

   trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/hive/default
 * trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/minio/default
   trino://hadoop@localhost:8080/hive/default
   trino://hadoop@localhost:9090/hive/default
   trino://hadoop@localhost:9091/hive/default
Done.


Table
airports
airports2
flights


In [285]:
%sql SHOW SCHEMAS FROM iceberg

   trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/hive/default
 * trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/minio/default
   trino://hadoop@localhost:8080/hive/default
   trino://hadoop@localhost:9090/hive/default
   trino://hadoop@localhost:9091/hive/default
Done.


Schema
default
information_schema


You should find the `flights` table from the above result. As we said before, table is identified by `<catalog>.<schema>.<table>`. You could access the `flights` table directly by `hive.cmpt733.flights`. Finally, let us see what's in `flights`. We list 10 rows of the table and save the result as a pandas dataframe:

In [286]:
result = %sql SELECT * FROM hive.default.airports
#SELECT * FROM hive.default.airports
df = result.DataFrame()


   trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/hive/default
 * trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/minio/default
   trino://hadoop@localhost:8080/hive/default
   trino://hadoop@localhost:9090/hive/default
   trino://hadoop@localhost:9091/hive/default
Done.


In [287]:
df.shape

(200, 4)

In [293]:
result = %sql SELECT * FROM hive.default.flights WHERE depdelay is not NULL
#SELECT * FROM hive.default.airports
#flights_df = result.DataFrame()


   trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/hive/default
 * trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/minio/default
   trino://hadoop@localhost:8080/hive/default
   trino://hadoop@localhost:9090/hive/default
   trino://hadoop@localhost:9091/hive/default
Done.


In [309]:
flights_df = %sql SELECT * FROM hive.default.flights WHERE depdelay is not NULL

   trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/hive/default
 * trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/minio/default
   trino://hadoop@localhost:8080/hive/default
   trino://hadoop@localhost:9090/hive/default
   trino://hadoop@localhost:9091/hive/default
Done.


In [308]:
%config SqlMagic.autolimit=0
%config SqlMagic.autopandas=True

In [311]:
flights_df

Unnamed: 0,schema,dayofmonth,dayofweek,carrier,originairportid,destairportid,depdelay,arrdelay
0,,19,5,DL,11433,13303,-3,1
1,,19,5,DL,14869,12478,0,-8
2,,19,5,DL,14057,14869,-4,-15
3,,19,5,DL,15016,11433,28,24
4,,19,5,DL,11193,12892,-6,-11
...,...,...,...,...,...,...,...,...
2702213,,19,6,DL,13204,14869,1,-3
2702214,,19,6,DL,10397,13495,-2,-3
2702215,,19,6,DL,10140,10397,18,9
2702216,,19,6,DL,10397,13244,10,7


If the SQL is long, you can use `%%sql` to run an SQL with multiple lines:

In [247]:
%%sql 
result << 
SELECT * 
FROM hive.default.flights 
LIMIT 10

   trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/hive/default
 * trino://hadoop@cs-galapagos.cmpt.sfu.ca:443/minio/default
   trino://hadoop@localhost:8080/hive/default
   trino://hadoop@localhost:9090/hive/default
   trino://hadoop@localhost:9091/hive/default
(trino.exceptions.TrinoUserError) TrinoUserError(type=USER_ERROR, name=NOT_SUPPORTED, message="Unsupported Trino column type (integer) for Parquet column ([carrier] optional binary carrier (STRING))", query_id=20230410_205953_00024_5utfz)
[SQL: SELECT * FROM hive.default.flights LIMIT 10]
(Background on this error at: https://sqlalche.me/e/14/f405)
