**PySpak Data Manipulation**
---

In [1]:
from pyspark.sql import SparkSession
session = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/24 22:10:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


**1. RDDs**

The RDDs (Resilient Distributed Datasets) are one of the most important data structures in Spark, and the basis of dataframes. You can think of them as “distributed” arrays. In many regards they behave like lists, with a few details we’ll discuss bellow.

In [2]:
# Create an RDD

rdd = session.sparkContext.parallelize([1,2,3])

In [3]:
#Interact with your rdd. This will bring the first two values of the RDD to the driver.
rdd.take(num=2)

                                                                                

[1, 2]

In [4]:
# .counmt() returns the length of the rdd
rdd.count()

3

In [5]:
rdd.collect()

[1, 2, 3]

**2. Dataframes**

If you know `pandas` or R dataframes you’ll have a very good idea of what `Spark dataframes` stand for. They represent tabular (matrix) data with named columns. Deep inside, they are implemented with an RDD of Row objects, which are somewhat similar to a Python named tuple. The greatest power of dataframes is they make you able to put your SQL thinking right into action. We’ll talk about dataframe manipulation later, but let’s start creating a dataframe so you can play with it.

In [7]:
df = session.createDataFrame(
  [[1,2,3], [4,5,6]], ['column1', 'column2', 'column3']
)

#The data matrix comes first and the column names later.

In [8]:
df.show(n=3)
#It will print a table representation of the dataframe with the first n rows.

+-------+-------+-------+
|column1|column2|column3|
+-------+-------+-------+
|      1|      2|      3|
|      4|      5|      6|
+-------+-------+-------+



**3. Immutability**

`One key difference` with `Python lists` is that `RDDs, (and also dataframes), are immutable.` Immutable data is often required in concurrent applications and functional languages.

In [9]:
# say you do this in python
a = list(range(10))
a.append(11)

In [11]:
# the result is python first creates 'a' then appends 11 to the list of 'a'
a

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11]

In [13]:
st = 'my string'
st += 'is pretty'

In [14]:
st

'my stringis pretty'

In [16]:
# In the same way RDDs and dataframes can’t be modified in place, so when you do
my_rdd.map(lambda x: x*100)
#my_rdd stays the same. 

PythonRDD[12] at RDD at PythonRDD.scala:53

In [19]:
# you would need to do this manually
my_rdd = rdd.map(lambda x: x*100)

### Transformations and actions
A more shocking difference with ordinary Python might confuse you at the beginning. Sometimes you’ll notice that a very heavy operation happens instantly. But later you do something little (like printing the first value of the RDD) and it seems to take forever.

 In Spark there’s a distinction between `transformations` and `actions.` `When you change a dataframe that’s a transformation,` however, `when you actually consume the data (eg df.show(1)) that’s an action.` `Transformations are lazy loaded,` they don’t run when you call them. They are executed when you consume their results via an action.

In [20]:
df.show()

+-------+-------+-------+
|column1|column2|column3|
+-------+-------+-------+
|      1|      2|      3|
|      4|      5|      6|
+-------+-------+-------+



### Dataframe manipulation

User Defined Functions let you use Python code to operate on dataframe cells.\
`You create a regular Python function, wrap it in a UDF object and pass it to Spark,` it will care of making your function available in all the workers and scheduling its execution to transform the data.

In [21]:
import pyspark.sql.functions as funcs
import pyspark.sql.types as types
def multiply_by_ten(number):
    return number*10.0

In [22]:
multiply_udf = funcs.udf(multiply_by_ten, types.DoubleType())

In [23]:
transformed_df = df.withColumn(
    'multiplied', multiply_udf('column1'))

In [24]:
transformed_df.show()

+-------+-------+-------+----------+
|column1|column2|column3|multiplied|
+-------+-------+-------+----------+
|      1|      2|      3|      10.0|
|      4|      5|      6|      40.0|
+-------+-------+-------+----------+



                                                                                

First you create a Python function, it could be a method in an object, that’s also a function. Then you create an UDF object. The annoying part is that you need to define the output type, something we aren’t really used to in Python. To be really effective with UDFs you’ll need to learn those types, specially the composite MapType (like dictionaries) and ArrayType (like lists). The benefit is that then you can pass this UDF to the dataframe, tell it which column it will be operating on, and you’ll get fantastic things done without leaving the comfort of your old Python.

One of the main limitations with UDFs, though, is that although `they can take several columns as input,` they can’t change the row as a whole. If you want to work with the whole row, you’ll need the RDD map.

### RDD mapping

It’s a lot like using a UDF, you also pass a regular Python function. But in this case, the function will be receiving a full Row object instead of column values. It will be expected to return a full Row as well. This will give you the ultimate power over your rows, with a couple of caveats.

`First: Row object are immutable, so you need to create a whole new Row and return it. \`
`Second: you need to convert the dataframe to an RDD and back again. Fortunately neither of these problems are hard to overcome.`

In [25]:
import pyspark.sql.types as types
def take_log_in_all_columns(row: types.Row):
     old_row = row.asDict()
     new_row = {f'log({column_name})': math.log(value) 
                for column_name, value in old_row.items()}
     return types.Row(**new_row)

In [26]:
#This by itself won’t do anything. You need to execute the map.
logarithmic_dataframe = df.rdd.map(take_log_in_all_columns).toDF()

23/05/24 22:26:27 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 51)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/patrick/opt/anaconda3/envs/Python_3_9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "/Users/patrick/opt/anaconda3/envs/Python_3_9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 822, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/patrick/opt/anaconda3/envs/Python_3_9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/patrick/opt/anaconda3/envs/Python_3_9/lib/python3.9/site-packages/pyspark/rdd.py", line 2830, in takeUpToNumLeft
    yield next(iterator)
  File "/Users/patrick/opt/anaconda3/envs/Python_3_9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/py

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 15.0 failed 1 times, most recent failure: Lost task 2.0 in stage 15.0 (TID 51) (192.168.0.116 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/patrick/opt/anaconda3/envs/Python_3_9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "/Users/patrick/opt/anaconda3/envs/Python_3_9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 822, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/patrick/opt/anaconda3/envs/Python_3_9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/patrick/opt/anaconda3/envs/Python_3_9/lib/python3.9/site-packages/pyspark/rdd.py", line 2830, in takeUpToNumLeft
    yield next(iterator)
  File "/Users/patrick/opt/anaconda3/envs/Python_3_9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/var/folders/q8/mbpwcdsj0ngfw0vpfscwqg4h0000gn/T/ipykernel_41233/1247788776.py", line 4, in take_log_in_all_columns
  File "/var/folders/q8/mbpwcdsj0ngfw0vpfscwqg4h0000gn/T/ipykernel_41233/1247788776.py", line 4, in <dictcomp>
NameError: name 'math' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:179)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:179)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/patrick/opt/anaconda3/envs/Python_3_9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "/Users/patrick/opt/anaconda3/envs/Python_3_9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 822, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/patrick/opt/anaconda3/envs/Python_3_9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/patrick/opt/anaconda3/envs/Python_3_9/lib/python3.9/site-packages/pyspark/rdd.py", line 2830, in takeUpToNumLeft
    yield next(iterator)
  File "/Users/patrick/opt/anaconda3/envs/Python_3_9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/var/folders/q8/mbpwcdsj0ngfw0vpfscwqg4h0000gn/T/ipykernel_41233/1247788776.py", line 4, in take_log_in_all_columns
  File "/var/folders/q8/mbpwcdsj0ngfw0vpfscwqg4h0000gn/T/ipykernel_41233/1247788776.py", line 4, in <dictcomp>
NameError: name 'math' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:179)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


### SQL operations

Since dataframes represent tables, naturally they are endowed with SQL-like operations. I’ll be referring just a few of them to get you excited, but you can expect to find almost isomorphic functionality.
Calling select will return a dataframe with only some of the original columns.

In [27]:
df.select('column1', 'column2')

DataFrame[column1: bigint, column2: bigint]

In [29]:
# This call to where will return a dataframe with only the rows where the value for column1 is 3.

df.where('column1 = 3')

DataFrame[column1: bigint, column2: bigint, column3: bigint]

This call to join will return a dataframe that is, well…, a join of df and df1 through column1 in the same way INNER JOIN would do in SQL.

In [31]:
df.join(df, ['column1'], how='inner')

DataFrame[column1: bigint, column2: bigint, column3: bigint, column2: bigint, column3: bigint]

In case you need to perform right or left joins, in our example df is like the left table and df1, the right one. Outer joins are also possible.

`But on top of that, in Spark you can execute SQL much more directly. You can create a temporal view out of a dataframe`

In [32]:
df.createOrReplaceTempView('table1')

In [33]:
# and then perform SQL queries on it.
df2 = session.sql("SELECT column1 AS f1, column2 as f2 from table1")

### Dataframe column operations

In [34]:
ADULT_COLUMN_NAMES = [
     "age",
     "workclass",
     "fnlwgt",
     "education",
     "education_num",
     "marital_status",
     "occupation",
     "relationship",
     "race",
     "sex",
     "capital_gain",
     "capital_loss",
     "hours_per_week",
     "native_country",
     "income"
 ]

In [36]:
csv_df = session.read.csv(
     '/Users/patrick/Desktop/Lighthouse_labs/Lighthouse-data-notes/Unit_8/Day_3/adult.data.csv', header=False, inferSchema=True
 )
for new_col, old_col in zip(ADULT_COLUMN_NAMES, csv_df.columns):
     csv_df = csv_df.withColumnRenamed(old_col, new_col)

                                                                                

In [37]:
csv_df.show(5)

+---+-----------------+--------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
|age|        workclass|  fnlwgt| education|education_num|     marital_status|        occupation|  relationship|  race|    sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+-----------------+--------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov| 77516.0| Bachelors|         13.0|      Never-married|      Adm-clerical| Not-in-family| White|   Male|      2174.0|         0.0|          40.0| United-States| <=50K|
| 50| Self-emp-not-inc| 83311.0| Bachelors|         13.0| Married-civ-spouse|   Exec-managerial|       Husband| White|   Male|         0.0|         0.0|          13.0| United-States| <=50K|
| 38|          Private|215646.0|   HS-grad|       

In [38]:
csv_df.describe().show()

23/05/24 22:37:12 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 19:>                                                         (0 + 1) / 1]

+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+----------------+------------------+--------------+------+
|summary|               age|   workclass|            fnlwgt|    education|    education_num|marital_status|       occupation|relationship|               race|    sex|      capital_gain|    capital_loss|    hours_per_week|native_country|income|
+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+----------------+------------------+--------------+------+
|  count|             32561|       32561|             32561|        32561|            32561|         32561|            32561|       32561|              32561|  32561|             32561|           32561|             32561|         32561| 32561|
|   mean| 38.58164675532

                                                                                

In [40]:
# To get an aggregation use groupBy together with the agg method. \
#For instance, the following will get you a dataframe with the work hours average and standard deviation by age group. We also sort the dataframe by age.

work_hours_df = csv_df.groupBy(
    'age'
).agg(
    funcs.avg('hours_per_week'),
    funcs.stddev_samp('hours_per_week')
).sort('age')

In [41]:
work_hours_df.show(5)

+---+-------------------+---------------------------+
|age|avg(hours_per_week)|stddev_samp(hours_per_week)|
+---+-------------------+---------------------------+
| 17| 21.367088607594937|         10.021014993616216|
| 18| 25.912727272727274|         11.733362123434848|
| 19| 30.678370786516854|         12.119154493614719|
| 20|  32.28021248339974|         11.726599330994663|
| 21|  34.03472222222222|         12.040389374051912|
+---+-------------------+---------------------------+
only showing top 5 rows



### Connecting to databases
It’s very likely that you keep your data in a relational database, handled by a RDBMS like MySQL or PostgreSQL. If that’s the case, don’t worry, Spark has ways of interacting with many kinds of data storage. I bet you can google your way through most IO needs you may have.

In [42]:
# first download the driver. In this case I’m using PostgreSQL, so I’ll download the PostgreSQL JDBC driver. Then I’ll copy it to the jars folder in my Spark installation.

session = SparkSession.builder.config(
    'spark.jars', '/Users/patrick/Desktop/Lighthouse_labs/Lighthouse-data-notes/Unit_8/Day_3/postgresql-42.5.4.jar'
).config(
    'spark.driver.extraClassPath', 'bin/postgresql-42.2.16.jar'
).getOrCreate()



23/05/24 22:43:08 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


This creates a session aware of the driver. You can then read from the database like this (replacing the fake configs with your real ones):

In [43]:
url = f"jdbc:postgresql://127.0.0.1/:5432/Northwind"
properties = {'user': 'user', '  ': '  '}
# read from a table into a dataframe
df = session.read.jdbc(
    url=url, table='employees', properties=properties
)

Py4JJavaError: An error occurred while calling o258.jdbc.
: java.sql.SQLException: No suitable driver
	at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:299)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:109)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:109)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:41)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:34)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:249)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


In [44]:
# Then you can create a transformed dataframe any way you want and write the data back to the database (maybe at a different table).
transformed_df.write.jdbc(
    url=url, table='new_table', mode='append', properties=properties
)


Py4JJavaError: An error occurred while calling o263.jdbc.
: java.sql.SQLException: No suitable driver
	at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:299)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:109)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:109)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:246)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:250)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:753)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


The writing modes according to the documentation are:

append: Append contents of this DataFrame to existing data.\
overwrite: Overwrite existing data.\
ignore: Silently ignore this operation if data already exists.\
error or “errorifexists” (default case): Throw an exception if data already exists.