In [7]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType
from pyspark.sql.functions import to_timestamp
import os

In [2]:
root_path = "my_dbt_project/data"
customer = f"{root_path}/customer.csv"
orders = f"{root_path}/orders.csv"
state = f"{root_path}/state.csv"

In [3]:
local = True
if local:
    spark = SparkSession.builder.master("local[4]").appName("Load_csv_to_postgres").config('spark.jars.packages',
                                                                                           'org.postgresql:postgresql:42.2.24').getOrCreate()
else:
    spark = SparkSession.builder \
        .master("k8s://https://kubernetes.default.svc:443") \
        .appName("Load_csv_to_postgres") \
        .config("spark.kubernetes.container.image", os.environ['IMAGE_NAME']) \
        .config("spark.kubernetes.authenticate.driver.serviceAccountName", os.environ['KUBERNETES_SERVICE_ACCOUNT']) \
        .config("spark.kubernetes.namespace", os.environ['KUBERNETES_NAMESPACE']) \
        .config("spark.executor.instances", "4") \
        .config("spark.executor.memory", "8g") \
        .config('spark.jars.packages', 'org.postgresql:postgresql:42.2.24') \
        .getOrCreate()


22/01/27 22:39:05 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.184.146 instead (on interface ens33)
22/01/27 22:39:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/spark-3.1.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/pliu/.ivy2/cache
The jars for the packages stored in: /home/pliu/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6e0725ff-fda0-45cf-9d20-cd1ad935879b;1.0
	confs: [default]
	found org.postgresql#postgresql;42.2.24 in central
	found org.checkerframework#checker-qual;3.5.0 in central
downloading https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.24/postgresql-42.2.24.jar ...
	[SUCCESSFUL ] org.postgresql#postgresql;42.2.24!postgresql.jar (166ms)
downloading https://repo1.maven.org/maven2/org/checkerframework/checker-qual/3.5.0/checker-qual-3.5.0.jar ...
	[SUCCESSFUL ] org.checkerframework#checker-qual;3.5.0!checker-qual.jar (30ms)
:: resolution report :: resolve 1954ms :: artifacts dl 202ms
	:: modules in use:
	org.checkerframework#checker-qual;3.5.0 from central in [default]
	org.postgresql#postgresql;42.2.24 from central in [default]
	----------------------------------

# 0. Setup postgres connection

In [None]:
# postgresql connexion config
db_name="dbt_project"
host_name="127.0.0.1"
db_url=f"jdbc:postgresql://{host_name}:5432/{db_name}"
user="pliu"
password="changeMe"
driver="org.postgresql.Driver"
# note the driver value need to be changed if you use other database
# e.g. Mysql: com.mysql.jdbc.Driver
#     postgresql: org.postgresql.Driver
db_properties={"user": user, "password": password, "driver" : driver }

# 1. Load customer csv to table csv

In [4]:

customer_df = spark.read.options(header="True",inferSchema='True',delimiter=',').csv(customer)
customer_df.show()

+-----------+-------+--------------------+----------+-------------------+-------------------+
|customer_id|zipcode|                city|state_code|   datetime_created|   datetime_updated|
+-----------+-------+--------------------+----------+-------------------+-------------------+
|          1|  14409|              franca|        SP|2017-10-18 00:00:00|2017-10-18 00:00:00|
|          2|   9790|sao bernardo do c...|        SP|2017-10-18 00:00:00|2017-10-18 00:00:00|
|          3|   1151|           sao paulo|        SP|2017-10-18 00:00:00|2017-10-18 00:00:00|
|          4|   8775|     mogi das cruzes|        SP|2017-10-18 00:00:00|2017-10-18 00:00:00|
|          5|  13056|            campinas|        SP|2017-10-18 00:00:00|2017-10-18 00:00:00|
|          6|  89254|      jaragua do sul|        SC|2017-10-18 00:00:00|2017-10-18 00:00:00|
|          7|   4534|           sao paulo|        SP|2017-10-18 00:00:00|2017-10-18 00:00:00|
|          8|  35182|             timoteo|        MG|2017-10

In [5]:
customer_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- datetime_created: string (nullable = true)
 |-- datetime_updated: string (nullable = true)



In [8]:
clean_customer_df=customer_df.withColumn("creation_date", to_timestamp("datetime_created", "yyyy-MM-dd HH:mm:ss"))\
           .withColumn("update_date",to_timestamp("datetime_updated","yyyy-MM-dd HH:mm:ss"))

clean_customer_df.show()

+-----------+-------+--------------------+----------+-------------------+-------------------+-------------------+-------------------+
|customer_id|zipcode|                city|state_code|   datetime_created|   datetime_updated|      creation_date|        update_date|
+-----------+-------+--------------------+----------+-------------------+-------------------+-------------------+-------------------+
|          1|  14409|              franca|        SP|2017-10-18 00:00:00|2017-10-18 00:00:00|2017-10-18 00:00:00|2017-10-18 00:00:00|
|          2|   9790|sao bernardo do c...|        SP|2017-10-18 00:00:00|2017-10-18 00:00:00|2017-10-18 00:00:00|2017-10-18 00:00:00|
|          3|   1151|           sao paulo|        SP|2017-10-18 00:00:00|2017-10-18 00:00:00|2017-10-18 00:00:00|2017-10-18 00:00:00|
|          4|   8775|     mogi das cruzes|        SP|2017-10-18 00:00:00|2017-10-18 00:00:00|2017-10-18 00:00:00|2017-10-18 00:00:00|
|          5|  13056|            campinas|        SP|2017-10-1

In [9]:
clean_customer_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- datetime_created: string (nullable = true)
 |-- datetime_updated: string (nullable = true)
 |-- creation_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)



In [24]:
# write to postgres db server, with table name customers
clean_customer_df.write.jdbc(url=db_url,table="customers",mode='overwrite',properties=db_properties)

You can check the generated customer table in postgres, the datetime_created column has type text (because it has string type in dataframe), the creation_date column has type timestamp (because it has timestamp type in dataframe)

# 2. Load order csv

In [14]:
order_df=spark.read.options(header='True',inferSchema='True',delimiter=',').csv(orders)
order_df.show()

+--------------------+-----------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+-----------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|         69|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|         17|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|         26|   delivered|     2018-08-08 08:38:49|2018-08-08 08:55:2

In [15]:
order_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: string (nullable = true)
 |-- order_approved_at: string (nullable = true)
 |-- order_delivered_carrier_date: string (nullable = true)
 |-- order_delivered_customer_date: string (nullable = true)
 |-- order_estimated_delivery_date: string (nullable = true)



In [18]:
clean_order_df=order_df.withColumn("order_purchase_date",to_timestamp("order_purchase_timestamp","yyyy-MM-dd HH:mm:ss"))\
    .withColumn("order_approved_date",to_timestamp("order_approved_at","yyyy-MM-dd HH:mm:ss"))\
    .withColumn("order_deli_carrier_date",to_timestamp("order_delivered_carrier_date","yyyy-MM-dd HH:mm:ss")) \
    .withColumn("order_deli_customer_date",to_timestamp("order_delivered_customer_date","yyyy-MM-dd HH:mm:ss")) \
    .withColumn("order_estimated_deli_date",to_timestamp("order_estimated_delivery_date","yyyy-MM-dd HH:mm:ss")) \
    .drop("order_purchase_timestamp","order_approved_at","order_delivered_carrier_date","order_delivered_customer_date","order_estimated_delivery_date")

clean_order_df.show()

+--------------------+-----------+------------+-------------------+-------------------+-----------------------+------------------------+-------------------------+
|            order_id|customer_id|order_status|order_purchase_date|order_approved_date|order_deli_carrier_date|order_deli_customer_date|order_estimated_deli_date|
+--------------------+-----------+------------+-------------------+-------------------+-----------------------+------------------------+-------------------------+
|e481f51cbdc54678b...|         69|   delivered|2017-10-02 10:56:33|2017-10-02 11:07:15|    2017-10-04 19:55:00|     2017-10-10 21:25:13|      2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|         17|   delivered|2018-07-24 20:41:37|2018-07-26 03:24:27|    2018-07-26 14:31:00|     2018-08-07 15:27:45|      2018-08-13 00:00:00|
|47770eb9100c2d0c4...|         26|   delivered|2018-08-08 08:38:49|2018-08-08 08:55:23|    2018-08-08 13:50:00|     2018-08-17 18:06:29|      2018-09-04 00:00:00|
|949d5b44dbf5de918...|

In [19]:
clean_order_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_date: timestamp (nullable = true)
 |-- order_approved_date: timestamp (nullable = true)
 |-- order_deli_carrier_date: timestamp (nullable = true)
 |-- order_deli_customer_date: timestamp (nullable = true)
 |-- order_estimated_deli_date: timestamp (nullable = true)



In [23]:
clean_order_df.write.jdbc(url=db_url,table="orders",mode='overwrite',properties=db_properties)

# 3. Load state csv

In [21]:
state_df=spark.read.options(header='True',inferSchema='True',delimiter=',').csv(state)
state_df.show()

+--------+----------+-------------------+
|state_id|state_code|         state_name|
+--------+----------+-------------------+
|       1|        AC|               Acre|
|       2|        AL|            Alagoas|
|       3|        AP|              Amapa|
|       4|        AM|           Amazonas|
|       5|        BA|              Bahia|
|       6|        CE|              Ceara|
|       7|        DF|   Distrito Federal|
|       8|        ES|     Espirito Santo|
|       9|        GO|              Goias|
|      10|        MA|           Maranhao|
|      11|        MT|         MatoGrosso|
|      12|        MS|  MatoGrosso do Sul|
|      13|        MG|       Minas Gerais|
|      14|        PA|               Para|
|      15|        PB|            Paraiba|
|      16|        PR|             Parana|
|      17|        PE|         Pernambuco|
|      18|        PI|              Piaui|
|      19|        RJ|     Rio de Janeiro|
|      20|        RN|Rio Grande do Norte|
+--------+----------+-------------

In [22]:
state_df.write.jdbc(url=db_url,table="states",mode='overwrite',properties=db_properties)