In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
appName = "load_parquet"
master = "local"

In [4]:
spark = SparkSession.builder \
        .master(master) \
        .appName(appName) \
        .getOrCreate()

21/09/10 06:13:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Read in Parquet

In [5]:
customers_sdf = spark.read.parquet('/home/jovyan/filesystem/customers.parquet')
orders_sdf = spark.read.parquet('/home/jovyan/filesystem/orders.parquet')
geolocation_sdf = spark.read.parquet('/home/jovyan/filesystem/geolocation.parquet')

                                                                                

In [7]:
customers_sdf.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [8]:
orders_sdf.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [9]:
geolocation_sdf.printSchema()

root
 |-- geolocation_zip_code_prefix: string (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)



## Load to Postgres

- Have to download PostgreSQL JDBC driver from [here](https://jdbc.postgresql.org/download.html) and place it in the `/usr/local/spark/jar/` folder. Before opening the Jupyter notebook, copy over the jar file

    `docker cp drivers/postgresql-42.2.23.jar dbt-with-postgres_pyspark-notebook_1:/usr/local/spark/jars/`

- Schema (as in Postgres schema) has to exist beforehand. Here, only writing to `public` schema is allowed.

- After writing to Postgres database, the schema (as in column types for the table) are not preserved.

- To preserve schema, add an additional option for `createTableColumnTypes` or `createTableOptions`. This means there is no need to create the tables beforehand but the schema has to be specified here.

In [21]:
# alternative method
customers_sdf.write \
    .jdbc(url="jdbc:postgresql://postgres-dest:5432/destdb", table="public.customers", properties={"user": "destdb1", "password": "destdb1"})

Py4JJavaError: An error occurred while calling o104.jdbc.
: java.sql.SQLException: No suitable driver
	at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:108)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:108)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:817)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)


These do not preserve schema:

In [20]:
customers_sdf.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres-dest:5432/destdb") \
    .option("dbtable", "public.customers") \
    .option("user", "destdb1") \
    .option("password", "destdb1") \
    .save()

Py4JJavaError: An error occurred while calling o102.save.
: java.sql.SQLException: No suitable driver
	at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:108)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:108)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [15]:
orders_sdf.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres-dest:5432/destdb") \
    .option("dbtable", "public.orders") \
    .option("user", "destdb1") \
    .option("password", "destdb1") \
    .save()

                                                                                

In [16]:
geolocation_sdf.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres-dest:5432/destdb") \
    .option("dbtable", "public.geolocation") \
    .option("user", "destdb1") \
    .option("password", "destdb1") \
    .save()

                                                                                

### Create Schema

In [21]:
print(customers_sdf.dtypes)

[('customer_id', 'string'), ('customer_unique_id', 'string'), ('customer_zip_code_prefix', 'string'), ('customer_city', 'string'), ('customer_state', 'string')]


In [22]:
customers_cols = [*map(lambda c: c[0], customers_sdf.dtypes)]
orders_cols = [*map(lambda c: c[0], orders_sdf.dtypes)]
geolocation_cols = [*map(lambda c: c[0], geolocation_sdf.dtypes)]

`TEXT` data type is not supported in Spark

In [23]:
customers_coltypes = "VARCHAR(128) VARCHAR(128) CHAR(5) VARCHAR(128) CHAR(2)".split(" ")
orders_coltypes = "VARCHAR(128) VARCHAR(128) VARCHAR(128) TIMESTAMP TIMESTAMP TIMESTAMP TIMESTAMP TIMESTAMP".split(" ")
geolocation_coltypes = "CHAR(5) FLOAT FLOAT VARCHAR(128) CHAR(2)".split(" ")

In [24]:
customers_schema = []
orders_schema = []
geolocation_schema = []

In [25]:
for col, coltype in zip(customers_cols, customers_coltypes):
    customers_schema.append(col + " " + coltype)
    
for col, coltype in zip(orders_cols, orders_coltypes):
    orders_schema.append(col + " " + coltype)
    
for col, coltype in zip(geolocation_cols, geolocation_coltypes):
    geolocation_schema.append(col + " " + coltype)

### Load to Postgres

These will use the schema specified in the previous section

In [10]:
customers_sdf.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres-dest:5432/destdb") \
    .option("createTableColumnTypes", ', '.join(customers_schema)) \
    .option("dbtable", "public.customers") \
    .option("user", "destdb1") \
    .option("password", "destdb1") \
    .save()

                                                                                

In [11]:
orders_sdf.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres-dest:5432/destdb") \
    .option("createTableColumnTypes", ', '.join(orders_schema)) \
    .option("dbtable", "public.orders") \
    .option("user", "destdb1") \
    .option("password", "destdb1") \
    .save()

                                                                                

In [26]:
geolocation_sdf.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres-dest:5432/destdb") \
    .option("createTableColumnTypes", ', '.join(geolocation_schema)) \
    .option("dbtable", "public.geolocation") \
    .option("user", "destdb1") \
    .option("password", "destdb1") \
    .save()

                                                                                