-
Notifications
You must be signed in to change notification settings - Fork 1
/
load_ice_transactions_pyspark.py
43 lines (35 loc) · 1.97 KB
/
load_ice_transactions_pyspark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# import SparkSession
from pyspark.sql import SparkSession
# create SparkSession
spark = SparkSession.builder \
.appName("Python Spark SQL example") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0,software.amazon.awssdk:bundle:2.19.19,software.amazon.awssdk:url-connection-client:2.19.19") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.icecatalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.icecatalog.catalog-impl", "org.apache.iceberg.jdbc.JdbcCatalog") \
.config("spark.sql.catalog.icecatalog.uri", "jdbc:postgresql://127.0.0.1:5432/icecatalog") \
.config("spark.sql.catalog.icecatalog.jdbc.user", "icecatalog") \
.config("spark.sql.catalog.icecatalog.jdbc.password", "supersecret1") \
.config("spark.sql.catalog.icecatalog.warehouse", "s3://iceberg-data") \
.config("spark.sql.catalog.icecatalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
.config("spark.sql.catalog.icecatalog.s3.endpoint", "http://127.0.0.1:9000") \
.config("spark.sql.catalog.sparkcatalog", "icecatalog") \
.config("spark.eventLog.enabled", "true") \
.config("spark.eventLog.dir", "/opt/spark/spark-events") \
.config("spark.history.fs.logDirectory", "/opt/spark/spark-events") \
.config("spark.sql.catalogImplementation", "in-memory") \
.getOrCreate()
# A JSON dataset is pointed to by 'path' variable
path = "/opt/spark/input/transactions.json"
# read json into the DataFrame
transactionsDF = spark.read.json(path)
# visualize the inferred schema
transactionsDF.printSchema()
# print out the dataframe in this cli
transactionsDF.show()
# Append these transactions to the table we created in an earlier step `icecatalog.icecatalog.transactions`
transactionsDF.writeTo("icecatalog.icecatalog.transactions").append()
# stop the sparkSession
spark.stop()
# Exit out of the editor:
quit();