
# Tables

---

## s3 sources
- orders
- transactions
- preferences
- facebook

## bronze tables
- orders_bronze
- transactions_bronze
- preferences_bronze
- facebook_bronze

In [0]:
%run "../../config/setup"

In [0]:
from config.settings import *
import pyspark.sql.functions as F

In [0]:
# display(dbutils.fs.ls(f"{S3_BUCKET_URI}"))

In [0]:

# display(dbutils.fs.ls(f"{ORDERS_PATH}"))

In [0]:
# display(dbutils.fs.ls(f"{TRANSACTIONS_PATH}"))

In [0]:
# display(dbutils.fs.ls(f"{PREFERENCES_PATH}"))

In [0]:
# display(dbutils.fs.ls(f"{FACEBOOK_PATH}"))


### Python code for ingesting orders csv data from s3

In [0]:
orders = spark.read.csv(f"{ORDERS_PATH}", header=True, inferSchema=True).select('*', F.col('_metadata.file_path').alias("source_file"), F.current_timestamp().alias("ingest_ts"))

orders.write.format("delta").mode("overwrite").saveAsTable("orders_bronze")

In [0]:
# display(orders)


### SQL code for ingesting orders csv data from s3

In [0]:
# %python
# dbutils.widgets.text("orders_path", ORDERS_PATH, "Orders Path")
# dbutils.widgets.text("transactions_path", TRANSACTIONS_PATH, "Transactions Path")
# dbutils.widgets.text("facebook_path", FACEBOOK_PATH, "Facebook Path")
# dbutils.widgets.text("preferences_path", PREFERENCES_PATH, "Preferences Path")

In [0]:
# %sql
# CREATE OR REPLACE TABLE orders_bronze AS
# SELECT *, input_file_name() AS source_file, current_timestamp() AS ingest_ts
#   FROM csv.`${orders_path}`

In [0]:
# %sql
# SELECT * FROM orders_bronze;

In [0]:
%sql

GRANT SELECT ON TABLE orders_bronze TO devs;


### Python code for ingesting transactions csv data from s3

In [0]:
transactions = spark.read.csv(f"{TRANSACTIONS_PATH}", header=True, inferSchema=True).select('*', F.col("_metadata.file_path").alias("source_file"), F.current_timestamp().alias("ingest_ts"))

transactions.write.format("delta").mode("overwrite").saveAsTable("transactions_bronze")

In [0]:
# display(transactions)


### SQL code for ingesting transactions csv data from s3

In [0]:
# %sql
# CREATE OR REPLACE TABLE transactions_bronze AS
# SELECT *, input_file_name() AS source_file, current_timestamp() AS ingest_ts
#   FROM csv.`${transactions_path}`;

# SELECT * FROM transactions_bronze;

In [0]:
%sql

GRANT SELECT ON TABLE transactions_bronze TO devs;


### Python code for ingesting facebook interactions json data from s3

In [0]:
fb = spark.read.json(f"{FACEBOOK_PATH}").select('*', F.col("_metadata.file_path").alias("source_file"), F.current_timestamp().alias("ingest_ts"))

fb.write.format("delta").mode("overwrite").saveAsTable("facebook_bronze")

In [0]:
# display(fb)


### SQL code for ingesting facebook interactions json data from s3

In [0]:
# %sql
# CREATE OR REPLACE TABLE facebook_bronze AS
# SELECT *, input_file_name() AS source_file, current_timestamp() AS ingest_ts
#   FROM json.`${facebook_path}`;


# SELECT * FROM facebook_bronze;

In [0]:
%sql

GRANT SELECT ON TABLE facebook_bronze TO devs;


### Python code for ingesting preferences json data from s3

In [0]:
preferences = spark.read.json(f"{PREFERENCES_PATH}").select('*', F.col("_metadata.file_path").alias("source_file"), F.current_timestamp().alias("ingest_ts"))

preferences.write.format("delta").mode("overwrite").saveAsTable("preferences_bronze")

In [0]:
# display(preferences)


### SQL code for ingesting preferences json data from s3

In [0]:
# %sql
# CREATE OR REPLACE TABLE preferences_bronze AS
# SELECT *, input_file_name() AS source_file, current_timestamp() AS ingest_ts
#   FROM json.`${preferences_path}`;

# SELECT * FROM preferences_bronze;

In [0]:
%sql

GRANT SELECT ON TABLE preferences_bronze TO devs;