/
create_gold_tables.py
40 lines (33 loc) · 1.09 KB
/
create_gold_tables.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from pyspark.sql import SparkSession
def create_tables(
spark,
path="s3a://adventureworks/delta",
database: str = "adventureworks",
):
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database}")
spark.sql(f"DROP TABLE IF EXISTS {database}.sales_mart")
spark.sql(
f"""
CREATE TABLE {database}.sales_mart (
deliver_date DATE,
state_id STRING,
num_orders BIGINT,
etl_inserted TIMESTAMP,
partition STRING
) USING DELTA
PARTITIONED BY (partition)
LOCATION '{path}/sales_mart'
"""
)
def drop_tables(spark, database: str = "adventureworks"):
spark.sql(f"DROP TABLE IF EXISTS {database}.sales_mart")
spark.sql(f"DROP DATABASE IF EXISTS {database}")
if __name__ == '__main__':
spark = (
SparkSession.builder.appName("adventureworks_ddl")
.config("spark.executor.cores", "1")
.config("spark.executor.instances", "1")
.enableHiveSupport()
.getOrCreate()
)
create_tables(spark)