# How to...read data from csv files and store them on Trino database

In [1]:
# Sets up the location of the api relative to this notebook 
import sys
sys.path.append('../../../')

In [2]:
# Import the module for connection to a trino database
from esg_matching.engine.connectors.trino import TrinoConnector

In [3]:
# Import the modules for file management
from esg_matching.file_reader.file import File
from esg_matching.file_reader.csv_reader import FileReaderCsv

In [4]:
# Import the modules for the etl processing: reading, transformation and loading data to a database
from esg_matching.processing.etl import EtlProcessing

## 1. Database setup

In [5]:
import os
user_trino = os.environ['TRINO_USER']
pwd_trino = os.environ['TRINO_PASSWD']
host_trino = os.environ['TRINO_HOST']
port_trino = int(os.environ['TRINO_PORT'])

In [6]:
# The database connector is represented by the class TrinoConnector 
db_conn = TrinoConnector()

In [7]:
# The connect() method of the TrinoConnector is used to stablish a connection with the database if it exists, 
# or to create a new one. The property path_db defines the location and name of the database.
# The  property show_sql_statement indicates if the SQL statements are echoed (or printed) in the default output channel.
db_conn.username = user_trino
db_conn.user_password = pwd_trino
db_conn.host_url = host_trino
db_conn.port_number = port_trino
db_conn.catalog = 'osc_datacommons_iceberg_dev'
db_conn.show_sql_statement = True
db_conn.connect()

2022-07-21 15:48:12,912 INFO sqlalchemy.engine.Engine SELECT version()
2022-07-21 15:48:12,916 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00424s] ()


In [8]:
# Check if the connection was stablished
db_conn.is_connected()

True

## 2. File setup

In [9]:
# Settings for Referential 1
file1_settings = '../../../tests/data/howto/trino/test_referential1_trino.json'
file1_settings

'../../../tests/data/howto/trino/test_referential1_trino.json'

In [10]:
# Create a file object
file_obj = File(file1_settings)

In [11]:
# Checking some properties of the File object
print('Filename:{}'.format(file_obj.filename))
print('Json Settings:{}'.format(file_obj.filename_settings))

Filename:../../../tests/data/test_referential1.csv
Json Settings:../../../tests/data/howto/trino/test_referential1_trino.json


## 3. Read a csv file and load its content to the database

The Esg-Entity-Matching library provides a FileReaderCsv that understands the content of csv files. 
It also provides an EtlProcessing object that combines file, connector and reader in order to perform the complete pipeline of reading, transforming and loading data into a database.

In [12]:
# Crete a file reader object for csv files
csv_reader_obj = FileReaderCsv()  

In [13]:
# Create an ETL process object
etl_proc_obj = EtlProcessing(db_conn)

In [14]:
# Call the load_file_to_db() method by passing the File, FileReader and SqlLiteConnector
# The ETL process returns a database source object
db_source = etl_proc_obj.load_file_to_db(file_obj, csv_reader_obj)

2022-07-21 15:48:24,783 INFO sqlalchemy.engine.Engine SELECT "table_name"
FROM "information_schema"."tables"
WHERE "table_schema" = ?
2022-07-21 15:48:24,784 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00151s] ('esg_matching',)
2022-07-21 15:48:26,319 INFO sqlalchemy.engine.Engine SELECT "table_name"
FROM "information_schema"."tables"
WHERE "table_schema" = ?
2022-07-21 15:48:26,320 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00086s] ('esg_matching',)
2022-07-21 15:48:27,789 INFO sqlalchemy.engine.Engine SELECT "table_name"
FROM "information_schema"."tables"
WHERE "table_schema" = ?
  AND "table_name" = ?
2022-07-21 15:48:27,790 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00082s] ('esg_matching', 'esg_match_tgt2')
2022-07-21 15:48:29,149 INFO sqlalchemy.engine.Engine SELECT
    "column_name",
    "data_type",
    "column_default",
    UPPER("is_nullable") AS "is_nullable"
FROM "informatio

  metadata.reflect(self._engine)
  metadata.reflect(self._engine)
  metadata.reflect(self._engine)
  metadata.reflect(self._engine)


2022-07-21 15:48:34,677 INFO sqlalchemy.engine.Engine ROLLBACK
2022-07-21 15:48:34,678 INFO sqlalchemy.engine.Engine SELECT "table_name"
FROM "information_schema"."tables"
WHERE "table_schema" = ?
  AND "table_name" = ?
2022-07-21 15:48:34,679 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00091s] ('esg_matching', 'esg_match_ref')
2022-07-21 15:48:35,960 INFO sqlalchemy.engine.Engine SELECT
    "column_name",
    "data_type",
    "column_default",
    UPPER("is_nullable") AS "is_nullable"
FROM "information_schema"."columns"
WHERE "table_schema" = ?
  AND "table_name" = ?
ORDER BY "ordinal_position" ASC
2022-07-21 15:48:35,961 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00103s] ('esg_matching', 'esg_match_ref')
2022-07-21 15:48:37,522 INFO sqlalchemy.engine.Engine SELECT "table_name"
FROM "information_schema"."tables"
WHERE "table_schema" = ?
  AND "table_name" = ?
2022-07-21 15:48:37,523 INFO sqlalchemy.engine.Engine [diale

  metadata.reflect(self._engine)
  metadata.reflect(self._engine)
  metadata.reflect(self._engine)
  metadata.reflect(self._engine)


2022-07-21 15:48:41,700 INFO sqlalchemy.engine.Engine ROLLBACK
2022-07-21 15:48:41,702 INFO sqlalchemy.engine.Engine SELECT "table_name"
FROM "information_schema"."tables"
WHERE "table_schema" = ?
  AND "table_name" = ?
2022-07-21 15:48:41,702 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00102s] ('esg_matching', 'esg_match_tgt1')
2022-07-21 15:48:42,956 INFO sqlalchemy.engine.Engine SELECT
    "column_name",
    "data_type",
    "column_default",
    UPPER("is_nullable") AS "is_nullable"
FROM "information_schema"."columns"
WHERE "table_schema" = ?
  AND "table_name" = ?
ORDER BY "ordinal_position" ASC
2022-07-21 15:48:42,956 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00080s] ('esg_matching', 'esg_match_tgt1')
2022-07-21 15:48:44,449 INFO sqlalchemy.engine.Engine SELECT "table_name"
FROM "information_schema"."tables"
WHERE "table_schema" = ?
  AND "table_name" = ?
2022-07-21 15:48:44,450 INFO sqlalchemy.engine.Engine [dia

  metadata.reflect(self._engine)
  metadata.reflect(self._engine)
  metadata.reflect(self._engine)
  metadata.reflect(self._engine)


2022-07-21 15:48:48,440 INFO sqlalchemy.engine.Engine ROLLBACK
2022-07-21 15:48:48,441 INFO sqlalchemy.engine.Engine SELECT "table_name"
FROM "information_schema"."tables"
WHERE "table_schema" = ?
  AND "table_name" = ?
2022-07-21 15:48:48,442 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00130s] ('esg_matching', 'esg_matching')
2022-07-21 15:48:49,964 INFO sqlalchemy.engine.Engine SELECT
    "column_name",
    "data_type",
    "column_default",
    UPPER("is_nullable") AS "is_nullable"
FROM "information_schema"."columns"
WHERE "table_schema" = ?
  AND "table_name" = ?
ORDER BY "ordinal_position" ASC
2022-07-21 15:48:49,965 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00113s] ('esg_matching', 'esg_matching')
2022-07-21 15:48:52,116 INFO sqlalchemy.engine.Engine SELECT "table_name"
FROM "information_schema"."tables"
WHERE "table_schema" = ?
  AND "table_name" = ?
2022-07-21 15:48:52,117 INFO sqlalchemy.engine.Engine [dialect

  metadata.reflect(self._engine)
  metadata.reflect(self._engine)
  metadata.reflect(self._engine)
  metadata.reflect(self._engine)


2022-07-21 15:48:55,951 INFO sqlalchemy.engine.Engine ROLLBACK
2022-07-21 15:48:55,952 INFO sqlalchemy.engine.Engine SELECT "table_name"
FROM "information_schema"."tables"
WHERE "table_schema" = ?
  AND "table_name" = ?
2022-07-21 15:48:55,953 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00094s] ('esg_matching', 'esg_no_matching')
2022-07-21 15:48:57,197 INFO sqlalchemy.engine.Engine SELECT
    "column_name",
    "data_type",
    "column_default",
    UPPER("is_nullable") AS "is_nullable"
FROM "information_schema"."columns"
WHERE "table_schema" = ?
  AND "table_name" = ?
ORDER BY "ordinal_position" ASC
2022-07-21 15:48:57,198 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00090s] ('esg_matching', 'esg_no_matching')
2022-07-21 15:48:58,745 INFO sqlalchemy.engine.Engine SELECT "table_name"
FROM "information_schema"."tables"
WHERE "table_schema" = ?
  AND "table_name" = ?
2022-07-21 15:48:58,747 INFO sqlalchemy.engine.Engine [d

  metadata.reflect(self._engine)
  metadata.reflect(self._engine)
  metadata.reflect(self._engine)
  metadata.reflect(self._engine)


2022-07-21 15:49:02,613 INFO sqlalchemy.engine.Engine ROLLBACK
2022-07-21 15:49:02,615 INFO sqlalchemy.engine.Engine SELECT "table_name"
FROM "information_schema"."tables"
WHERE "table_schema" = ?
  AND "table_name" = ?
2022-07-21 15:49:02,616 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00122s] ('esg_matching', 'matching')
2022-07-21 15:49:04,088 INFO sqlalchemy.engine.Engine SELECT
    "column_name",
    "data_type",
    "column_default",
    UPPER("is_nullable") AS "is_nullable"
FROM "information_schema"."columns"
WHERE "table_schema" = ?
  AND "table_name" = ?
ORDER BY "ordinal_position" ASC
2022-07-21 15:49:04,089 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00115s] ('esg_matching', 'matching')
2022-07-21 15:49:05,564 INFO sqlalchemy.engine.Engine SELECT "table_name"
FROM "information_schema"."tables"
WHERE "table_schema" = ?
  AND "table_name" = ?
2022-07-21 15:49:05,565 INFO sqlalchemy.engine.Engine [dialect trino+r

  metadata.reflect(self._engine)
  metadata.reflect(self._engine)
  metadata.reflect(self._engine)
  metadata.reflect(self._engine)


2022-07-21 15:49:10,520 INFO sqlalchemy.engine.Engine ROLLBACK
2022-07-21 15:49:10,521 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-21 15:49:10,522 INFO sqlalchemy.engine.Engine 
DROP TABLE esg_matching.esg_match_ref
2022-07-21 15:49:10,523 INFO sqlalchemy.engine.Engine [no key 0.00123s] ()
2022-07-21 15:49:13,854 INFO sqlalchemy.engine.Engine COMMIT
2022-07-21 15:49:13,856 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-21 15:49:13,857 INFO sqlalchemy.engine.Engine 
CREATE TABLE esg_matching.esg_match_ref (
	unique_id VARCHAR, 
	isin VARCHAR(12), 
	company VARCHAR(100), 
	country VARCHAR(100)
)


2022-07-21 15:49:13,859 INFO sqlalchemy.engine.Engine [no key 0.00137s] ()
2022-07-21 15:49:14,978 INFO sqlalchemy.engine.Engine COMMIT
{'unique_id': '1', 'isin': 'SK1120005824', 'company': 'CENTRAL PERK', 'country': 'SK'}
2022-07-21 15:49:15,006 INFO sqlalchemy.engine.Engine INSERT INTO esg_matching.esg_match_ref (unique_id, isin, company, country) VALUES (?, ?, ?, ?)
2022-0

## 4. Report on Etl Process

In [15]:
# Printing the ELT Processing Report 
etl_proc_obj.print_report()

-------------------------------------- ETL PROCESSING REPORT ---------------------------------------
Description: Details of the ETL process performed on [ds_ref] data source.
Datetime:2022-07-21 15:50:48
----------------------------------------------------------------------------------------------------
File Name: ../../../tests/data/test_referential1.csv
Columns in the File: 4
Columns read from File: 4
Lines Extracted from File: 9


## 5. Checking the attribute names of DbDataSource

There are three methods to check the column or attribute names of the DbDataSource object:
1. Use get_original_field_names(): to retrieve the original attribute names of the columns in the csv file
2. Use get_field_names(): to retrieve the attribute names of the database table
3. Use get_primary_keys(): to retrieve the attribute names of the primary keys in the database table

In [16]:
# Retrieve the original attribute names (read from the csv file)
db_source.get_original_attribute_names()

['unique_id', 'isin', 'company', 'country']

In [17]:
# Retrieve the attribute names of the database table
db_source.get_attribute_names()

['unique_id', 'isin', 'company', 'country']

## 6. Checking the Data Source

In [18]:
print('Data Source Name: {}, Table name: {}'.format(db_source.name, db_source.table_name))

Data Source Name: ds_ref, Table name: esg_match_ref


In [19]:
# Total entries of the table
result = db_source.get_total_entries()
print('Total entries in table {} = {}'.format(db_source.table_name, result))

2022-07-21 15:50:54,903 INFO sqlalchemy.engine.Engine SELECT count(*) AS count_1 
FROM esg_matching.esg_match_ref
2022-07-21 15:50:54,905 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00274s] ()
Total entries in table esg_match_ref = 9


In [20]:
# Total entries of the table by a column name
result = db_source.get_total_entries_by_column('isin')
print('Total entries by ISIN = {}'.format(result))

2022-07-21 15:50:56,086 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-21 15:50:56,091 INFO sqlalchemy.engine.Engine SELECT count(esg_matching.esg_match_ref.isin) AS count_1 
FROM esg_matching.esg_match_ref
LIMIT ?
2022-07-21 15:50:56,092 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00120s] (1,)
2022-07-21 15:50:58,051 INFO sqlalchemy.engine.Engine ROLLBACK
Total entries by ISIN = 7


In [21]:
# Total entries of the table by a column name with distinct values
result = db_source.get_total_entries_by_column('isin', distinct_values=True)
print('Total entries by ISIN with distinct values = {}'.format(result))

2022-07-21 15:50:58,063 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-21 15:50:58,066 INFO sqlalchemy.engine.Engine SELECT count(DISTINCT esg_matching.esg_match_ref.isin) AS count_1 
FROM esg_matching.esg_match_ref
LIMIT ?
2022-07-21 15:50:58,067 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00134s] (1,)
2022-07-21 15:50:59,988 INFO sqlalchemy.engine.Engine ROLLBACK
Total entries by ISIN with distinct values = 6


## 7. Checking the content of the DbDataSource

The get_data() method of the DbDataSource object performs a full select in the table, returning a list of tupples. Each item of the list is a row in the table and each element is the value per column.

In [22]:
# Query all the values of the table
# Equivalent to SELECT * FROM TABLE_NAME
lst_result = db_source.get_data()
lst_result

2022-07-21 15:51:00,010 INFO sqlalchemy.engine.Engine SELECT esg_matching.esg_match_ref.unique_id, esg_matching.esg_match_ref.isin, esg_matching.esg_match_ref.company, esg_matching.esg_match_ref.country 
FROM esg_matching.esg_match_ref
2022-07-21 15:51:00,011 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00152s] ()


[('9', 'US0126531013', 'SPECTRE 33 SUBSIDIARY', 'USA'),
 ('5', 'CH0012221716', 'Bluth company', 'CHE'),
 ('6', 'US0200021014', 'InGen', 'usa'),
 ('7', 'US0231351067', 'Stark Industries', 'us'),
 ('2', None, 'HONEYDUKES', 'UNITED STATES OF AMERICA'),
 ('3', None, 'STARCOURT MALL', 'AUSTRIA'),
 ('1', 'SK1120005824', 'CENTRAL PERK', 'SK'),
 ('8', 'US0126531013', 'SPECTRE', 'USA'),
 ('4', 'GB00B1YW4409', 'STERLING COOPER', 'GBR')]

The get_data_as_df() method of the DbDataSource also performs a select in the table, but returns a pandas dataframe as result.

In [23]:
# Query the table
df_result = db_source.get_data_as_df()
df_result

2022-07-21 15:51:01,194 INFO sqlalchemy.engine.Engine SELECT esg_matching.esg_match_ref.unique_id, esg_matching.esg_match_ref.isin, esg_matching.esg_match_ref.company, esg_matching.esg_match_ref.country 
FROM esg_matching.esg_match_ref
2022-07-21 15:51:01,195 INFO sqlalchemy.engine.Engine [dialect trino+rest does not support caching 0.00126s] ()


Unnamed: 0,unique_id,isin,company,country
0,5,CH0012221716,Bluth company,CHE
1,1,SK1120005824,CENTRAL PERK,SK
2,6,US0200021014,InGen,usa
3,3,,STARCOURT MALL,AUSTRIA
4,7,US0231351067,Stark Industries,us
5,8,US0126531013,SPECTRE,USA
6,2,,HONEYDUKES,UNITED STATES OF AMERICA
7,9,US0126531013,SPECTRE 33 SUBSIDIARY,USA
8,4,GB00B1YW4409,STERLING COOPER,GBR


## 8. Drop the table using DbDataSource object

In [None]:
db_source.drop_table()

## 9. Close database connection

In [None]:
db_conn.disconnect()

In [None]:
db_conn.is_connected()