# Loading from Iceberg table through [Tigergraph Spark Connector](https://docs.tigergraph.com/tigergraph-server/current/data-loading/load-from-spark-dataframe)

## Data preparation for Iceberg table.
The cell below will create Iceberg tables that corresponds to the graph schema, then insert sample data to them. Omit this step if using other data sources.

In [None]:
# Account table
spark.sql("""
    CREATE TABLE demo.financialGraph.Account (
        name STRING,
        isBlocked BOOLEAN
    ) USING iceberg
""")
spark.sql("""
    INSERT INTO demo.financialGraph.Account (name, isBlocked) VALUES
    ('Scott', FALSE),
    ('Jenny', FALSE),
    ('Steven', TRUE),
    ('Paul', FALSE),
    ('Ed', FALSE)
""")
print("Displaying data from Account table:")
spark.sql("SELECT * FROM demo.financialGraph.Account").show()
# +------+---------+
# |  name|isBlocked|
# +------+---------+
# | Scott|    false|
# | Jenny|    false|
# |Steven|     true|
# |  Paul|    false|
# |    Ed|    false|
# +------+---------+


# City table
spark.sql("""
    CREATE TABLE demo.financialGraph.City (
        name STRING
    ) USING iceberg
""")
spark.sql("""
    INSERT INTO demo.financialGraph.City (name) VALUES
    ('New York'),
    ('Gainesville'),
    ('San Francisco')
""")
print("Displaying data from City table:")
spark.sql("SELECT * FROM demo.financialGraph.City").show()
# +-------------+
# |         name|
# +-------------+
# |     New York|
# |  Gainesville|
# |San Francisco|
# +-------------+


# Phone table
spark.sql("""
    CREATE TABLE demo.financialGraph.Phone (
        number STRING,
        isBlocked BOOLEAN
    ) USING iceberg
""")
spark.sql("""
    INSERT INTO demo.financialGraph.Phone (number, isBlocked) VALUES
    ('718-245-5888', FALSE),
    ('650-658-9867', TRUE),
    ('352-871-8978', FALSE)
""")
print("Displaying data from Phone table:")
spark.sql("SELECT * FROM demo.financialGraph.Phone").show()
# +------------+---------+
# |      number|isBlocked|
# +------------+---------+
# |718-245-5888|    false|
# |650-658-9867|     true|
# |352-871-8978|    false|
# +------------+---------+


# Transfer table
spark.sql("""
    CREATE TABLE demo.financialGraph.transfer (
        from_account STRING,
        to_account STRING,
        date DATE,
        amount INT
    ) USING iceberg
""")
spark.sql("""
    INSERT INTO demo.financialGraph.transfer (from_account, to_account, date, amount) VALUES
    ('Scott', 'Ed', CAST('2024-01-04' AS DATE), 20000),
    ('Scott', 'Ed', CAST('2024-02-01' AS DATE), 800),
    ('Scott', 'Ed', CAST('2024-02-14' AS DATE), 500),
    ('Jenny', 'Scott', CAST('2024-04-04' AS DATE), 1000),
    ('Paul', 'Jenny', CAST('2024-02-01' AS DATE), 653),
    ('Steven', 'Jenny', CAST('2024-05-01' AS DATE), 8560),
    ('Ed', 'Paul', CAST('2024-01-04' AS DATE), 1500),
    ('Paul', 'Steven', CAST('2023-05-09' AS DATE), 20000)
""")
print("Displaying data from Transfer table:")
spark.sql("SELECT * FROM demo.financialGraph.transfer").show()
# +------------+----------+----------+------+
# |from_account|to_account|      date|amount|
# +------------+----------+----------+------+
# |       Scott|        Ed|2024-01-04| 20000|
# |       Scott|        Ed|2024-02-01|   800|
# |       Scott|        Ed|2024-02-14|   500|
# |       Jenny|     Scott|2024-04-04|  1000|
# |        Paul|     Jenny|2024-02-01|   653|
# |      Steven|     Jenny|2024-05-01|  8560|
# |          Ed|      Paul|2024-01-04|  1500|
# |        Paul|    Steven|2023-05-09| 20000|
# +------------+----------+----------+------+


# hasPhone table
spark.sql("""
    CREATE TABLE demo.financialGraph.hasPhone (
        account STRING,
        phone STRING
    ) USING iceberg
""")
spark.sql("""
    INSERT INTO demo.financialGraph.hasPhone (account, phone) VALUES
    ('Scott', '718-245-5888'),
    ('Jenny', '718-245-5888'),
    ('Jenny', '650-658-9867'),
    ('Paul', '650-658-9867'),
    ('Ed', '352-871-8978')
""")
print("Displaying data from hasPhone table:")
spark.sql("SELECT * FROM demo.financialGraph.hasPhone").show()
# +-------+------------+
# |account|       phone|
# +-------+------------+
# |  Scott|718-245-5888|
# |  Jenny|718-245-5888|
# |  Jenny|650-658-9867|
# |   Paul|650-658-9867|
# |     Ed|352-871-8978|
# +-------+------------+


# isLocatedIn table
spark.sql("""
    CREATE TABLE demo.financialGraph.isLocatedIn (
        account STRING,
        city STRING
    ) USING iceberg
""")
spark.sql("""
    INSERT INTO demo.financialGraph.isLocatedIn (account, city) VALUES
    ('Scott', 'New York'),
    ('Jenny', 'San Francisco'),
    ('Steven', 'San Francisco'),
    ('Paul', 'Gainesville'),
    ('Ed', 'Gainesville')
""")
print("Displaying data from isLocatedIn table:")
spark.sql("SELECT * FROM demo.financialGraph.isLocatedIn").show()
# +-------+-------------+
# |account|         city|
# +-------+-------------+
# |  Scott|     New York|
# |  Jenny|San Francisco|
# | Steven|San Francisco|
# |   Paul|  Gainesville|
# |     Ed|  Gainesville|
# +-------+-------------+

## Define the job to load data from an Iceberg table into the target vertex or edge.
Copy the content below to your container and save as file `load3.gsql`.
```gsql
USE GRAPH financialGraph

DROP JOB load_iceberg

//load from iceberg tables
CREATE LOADING JOB load_iceberg  {
 DEFINE FILENAME account;
 DEFINE FILENAME phone;
 DEFINE FILENAME city;
 DEFINE FILENAME hasPhone;
 DEFINE FILENAME locatedIn;
 DEFINE FILENAME transferdata;
 //define the mapping from the source table to the target graph element type. The mapping is specified by VALUES clause. 
 LOAD account TO VERTEX Account VALUES ($0, $1);
 LOAD phone TO VERTEX Phone VALUES ($0, $1);
 LOAD city TO VERTEX City VALUES ($0);
 LOAD hasPhone TO Edge hasPhone VALUES ($0, $1);
 LOAD locatedIn TO Edge isLocatedIn VALUES ($0, $1);
 LOAD transferdata TO Edge transfer VALUES ($0, $1, $2, $3);
}
```
Next, run the following in your container's bash command line.
```bash
gsql load3.gsql
```
Or copy the content and paste in GSQL shell editor of TigerGraph Savanna to run.

---

* The `FILENAME` variables are defined but unassigned, they will be referenced in the following Spark write step. 
* The `LOAD` statement maps the data source to the target schema elements by the **column index**, for example:

    For `VERTEX Account ( name STRING PRIMARY KEY, isBlocked BOOL)` and Iceberg table:
    ```
    +------+---------+
    |  name|isBlocked|
    +------+---------+
    | Scott|    false|
    | Jenny|    false|
    +------+---------+
    ```
    The first column(`$0`) is `name` and second column(`$1`) is `isBlock`, so that we can define the LOAD statement as `LOAD account TO VERTEX Account VALUES ($0, $1)`.

## Data loading through TigerGraph Spark connector
The TigerGraph Spark Connector employs Apache Spark to read data from a Spark DataFrame (from Iceberg table, or alternative Spark data sources) and write to TigerGraph.

### Prerequisite
Add the JAR of TigerGraph Spark Connector to Spark's `jars` folder. You can download the JAR from [Maven Central](https://central.sonatype.com/artifact/com.tigergraph/tigergraph-spark-connector/versions).

### Define connection options
* Fill in the actual version in "version", e.g., "4.1.0".
* For TigerGraph Savanna users: replace the url with "https://<cloud_domain_name>:443".
* Choose one of the "username" and "password", "secret", and "token" authentication method.

In [None]:
connection_opts = {
    "url": "http://localhost:14240",
    "version": "<tg_version>",
    "graph": "financialGraph",
    "username": "tigergraph",
    "password": "tigergraph"
    # alternative: "secret": "<secret>"
    # alternative: "token": "<JWT>"
}

### Read Iceberg tables as Spark DataFrame and write to TigerGraph

In [None]:
# Define the list of tables and their corresponding loading job filenames for TigerGraph
tables = [
    ("demo.financialGraph.Account", "account"),
    ("demo.financialGraph.City", "city"),
    ("demo.financialGraph.Phone", "phone"),
    ("demo.financialGraph.transfer", "transfer"),
    ("demo.financialGraph.hasPhone", "hasphone"),
    ("demo.financialGraph.isLocatedIn", "islocatedin")
]

# Loop through each table, read it as a DataFrame, and write to TigerGraph
for table_name, filename in tables:
    df = spark.sql(f"SELECT * FROM {table_name}")
    df.write \
        .format("tigergraph") \
        .mode("append") \
        .options(**connection_opts) \
        .option("loading.job", "load_iceberg") \
        .option("loading.filename", filename) \
        .option("loading.separator", "|") \
        .save()
    print(f"Data from {table_name} table has been written to TigerGraph using filename {filename}.")