In [None]:
# Path for spark-jars folder
SPARK_JARS_PATH = "/path/to/spark-jars/*"

# JSON Parameters file
PARAMS_PATH = "params-db-Playlist.json"

In [28]:
import json
import yaml

# Getting parameters
with open(PARAMS_PATH, "r") as f:
  params = json.load(f)

print("\nParameters:\n")
print(yaml.dump(params, allow_unicode = True, default_flow_style = False, sort_keys = False))


Parameters:

source:
  type: database
  jdbc_driver: org.sqlite.JDBC
  url: jdbc:sqlite:data/Chinook_Sqlite.sqlite
  query:
  - SELECT *
  - FROM Playlist
  user: ''
  password: ''
  execute_before_query: 'SELECT 1 '
data:
  rename:
    PlaylistId: playlist_id
  auto_trim: true
  treat_columns: {}
  schema:
  - col: playlist_id
    type: integer
    comment: Playlist ID
  - col: name
    type: string
    comment: Playlist Name
  py_columns:
  - col: processed_at
    value: current_timestamp()
    comment: Date and time of data processing
destination:
  catalog: default
  warehouse_path: file:///home/rafa-aguiar/Documentos/Projetos/data/write
  database: raw
  table: chinook_playlist
  mode: overwrite
  partition_by: []



In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# SparkSession
spark = SparkSession.builder \
  .appName("Generic Ingestion") \
  .master("local[*]") \
  .config("spark.sql.session.timeZone", "UTC") \
  .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
  .config("spark.jars", SPARK_JARS_PATH) \
  .getOrCreate()

In [30]:
## Reading data
print("\nReading data ...\n")
print(yaml.dump(params['source'], allow_unicode = True, default_flow_style = False, sort_keys = False))

# Source [CSV]
if params['source']['type'] == "csv":
  print("\nReading CSV file ...")

  df = spark.read.csv(
      path = params['source']['path'],
      sep = params['source']['separator'],
      encoding = params['source']['encoding'],
      header = params['source']['header'],
      inferSchema = True
    )

# Source [Banco de Dados]
elif params['source']['type'] == "database":
  print("\nReading database ...")
  QUERY = eval(f'f"{" ".join(params['source']['query']).strip()}"')
  print(QUERY)

  df = spark.read.format("jdbc") \
    .option("driver", params['source']['jdbc_driver']) \
    .option("url", params['source']['url']) \
    .option("dbtable", f"({QUERY})") \
    .option("user", params['source']['user']) \
    .option("password", params['source']['password']) \
    .option("sessionInitStatement", params['source']['execute_before_query']) \
    .load()

# Source [API]
elif params['source']['type'] == "api":
  import requests
  import pandas as pd
  print("\nReading api ...")

  response = requests.get(
    params['source']['url'],
    **params['source']['optional_params']
  )

  if params['source']['data_location_key']:
    df_pd = pd.json_normalize(response.json()[params['source']['data_location_key']])
  else:
    df_pd = pd.json_normalize(response.json())

  df = spark.createDataFrame(df_pd)

df.cache()


Reading data ...

type: database
jdbc_driver: org.sqlite.JDBC
url: jdbc:sqlite:data/Chinook_Sqlite.sqlite
query:
- SELECT *
- FROM Playlist
user: ''
password: ''
execute_before_query: 'SELECT 1 '


Reading database ...
SELECT * FROM Playlist


DataFrame[PlaylistId: int, Name: string]

In [31]:
df.show(10)

+----------+------------+
|PlaylistId|        Name|
+----------+------------+
|         1|       Music|
|         2|      Movies|
|         3|    TV Shows|
|         4|  Audiobooks|
|         5|  90’s Music|
|         6|  Audiobooks|
|         7|      Movies|
|         8|       Music|
|         9|Music Videos|
|        10|    TV Shows|
+----------+------------+
only showing top 10 rows



In [32]:
df.printSchema()

root
 |-- PlaylistId: integer (nullable = true)
 |-- Name: string (nullable = true)



In [33]:
# Rename columns
df = df.withColumnsRenamed(params['data']['rename'])

for col_name in df.columns:
  df = df.withColumnRenamed(col_name, col_name.lower())

df.printSchema()

root
 |-- playlist_id: integer (nullable = true)
 |-- name: string (nullable = true)



In [34]:
# Trim
if params['data']['auto_trim']:
  for column in df.columns:
    if dict(df.dtypes)[column] == "string":
      df = df.withColumn(column, trim(df[column]))

# Pre treatment
for treat_column_name, treat_column_value in params['data']['treat_columns'].items():
  df = df.withColumn(treat_column_name, expr(treat_column_value))

In [35]:
# Cast
for col_cast in params['data']['schema']:
  df = df \
    .withColumn(col_cast['col'], col(col_cast['col']).cast(col_cast['type'])) \
    .withMetadata(col_cast['col'], {'comment': col_cast['comment']})

In [36]:
df.show(10)

+-----------+------------+
|playlist_id|        name|
+-----------+------------+
|          1|       Music|
|          2|      Movies|
|          3|    TV Shows|
|          4|  Audiobooks|
|          5|  90’s Music|
|          6|  Audiobooks|
|          7|      Movies|
|          8|       Music|
|          9|Music Videos|
|         10|    TV Shows|
+-----------+------------+
only showing top 10 rows



In [37]:
for py_column in params['data']['py_columns']:
  df = df \
    .withColumn(py_column['col'], expr(py_column['value'])) \
    .withMetadata(py_column['col'], {'comment': py_column['comment']})

In [38]:
for field in df.schema.fields:
  print(f"Coluna: {field.name}, Tipo: {field.dataType}, Comentário: {field.metadata.get('comment', 'Nenhum comentário')}")

Coluna: playlist_id, Tipo: IntegerType(), Comentário: Playlist ID
Coluna: name, Tipo: StringType(), Comentário: Playlist Name
Coluna: processed_at, Tipo: TimestampType(), Comentário: Date and time of data processing


In [39]:
## Save
print("\nSaving data ...\n")
print(yaml.dump(params['destination'], allow_unicode = True, default_flow_style = False, sort_keys = False))

catalog = params['destination']['catalog']
warehouse = params['destination']['warehouse_path']

spark.conf.set(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set(f"spark.sql.catalog.{catalog}.type", "hadoop")
spark.conf.set(f"spark.sql.catalog.{catalog}.warehouse", warehouse)

df.write \
  .format("iceberg") \
  .mode(params['destination'].get("mode", "overwrite")) \
  .partitionBy(*params['destination'].get("partition_by", [])) \
  .saveAsTable(f"{params['destination']['catalog']}.{params['destination']['database']}.{params['destination']['table']}")


Saving data ...

catalog: default
warehouse_path: file:///home/rafa-aguiar/Documentos/Projetos/data/write
database: raw
table: chinook_playlist
mode: overwrite
partition_by: []



25/04/20 15:32:34 WARN HadoopTableOperations: Error reading version hint file file:/home/rafa-aguiar/Documentos/Projetos/data/write/raw/chinook_playlist/metadata/version-hint.text
java.io.FileNotFoundException: File file:/home/rafa-aguiar/Documentos/Projetos/data/write/raw/chinook_playlist/metadata/version-hint.text does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
	at org.apache.iceberg.hadoop.HadoopTableOperations.findV

In [40]:
spark.read.table(f"{params['destination']['catalog']}.{params['destination']['database']}.{params['destination']['table']}").show()

[Stage 10:>                                                         (0 + 1) / 1]

+-----------+--------------------+--------------------+
|playlist_id|                name|        processed_at|
+-----------+--------------------+--------------------+
|          1|               Music|2025-04-20 18:32:...|
|          2|              Movies|2025-04-20 18:32:...|
|          3|            TV Shows|2025-04-20 18:32:...|
|          4|          Audiobooks|2025-04-20 18:32:...|
|          5|          90’s Music|2025-04-20 18:32:...|
|          6|          Audiobooks|2025-04-20 18:32:...|
|          7|              Movies|2025-04-20 18:32:...|
|          8|               Music|2025-04-20 18:32:...|
|          9|        Music Videos|2025-04-20 18:32:...|
|         10|            TV Shows|2025-04-20 18:32:...|
|         11|     Brazilian Music|2025-04-20 18:32:...|
|         12|           Classical|2025-04-20 18:32:...|
|         13|Classical 101 - D...|2025-04-20 18:32:...|
|         14|Classical 101 - N...|2025-04-20 18:32:...|
|         15|Classical 101 - T...|2025-04-20 18:

                                                                                