#### First, increase the Java heap space by setting these configurations before creating your SparkSession:

In [None]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.driver.memory", "4g")  # Increase driver memory
conf.set("spark.executor.memory", "4g") # Increase executor memory
conf.set("spark.driver.maxResultSize", "2g")

spark = SparkSession.builder \
    .config(conf=conf) \
    .getOrCreate()

#### If that's not enough, you can try writing in batches instead of all at once:

In [None]:
def write_in_batches(df, jdbc_url, table_name, batch_size=10000):
    total_rows = df.count()
    num_partitions = (total_rows + batch_size - 1) // batch_size
    
    df.repartition(num_partitions).write \
        .option("batchsize", batch_size) \
        .jdbc(url=jdbc_url,
              table=table_name,
              mode="append",
              properties=properties)

#### Another approach is to repartition your DataFrame before writing:

In [None]:
# Repartition to smaller chunks
extracted_derived_df = extracted_derived_df.repartition(10)  # adjust number based on your data size

# Then write
extracted_derived_df.write.jdbc(
    url=jdbc_url,
    table="derived_columns",
    mode="append",
    properties=properties
)

#### You can also try setting specific JDBC batch parameters:

In [None]:
properties.update({
    "batchsize": "1000",  # Adjust based on your needs
    "isolationLevel": "READ_COMMITTED"
})

extracted_derived_df.write \
    .option("numPartitions", 10) \
    .jdbc(url=jdbc_url,
          table="derived_columns",
          mode="append",
          properties=properties)

#### New Approaches

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-memory 8g --executor-memory 8g pyspark-shell'

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.driver.memory", "8g")
conf.set("spark.executor.memory", "8g")
conf.set("spark.driver.maxResultSize", "4g")
conf.set("spark.sql.shuffle.partitions", "100")  # Increase shuffle partitions
conf.set("spark.default.parallelism", "100")     # Increase parallelism
conf.set("spark.memory.fraction", "0.8")         # Give more memory to execution
conf.set("spark.memory.storageFraction", "0.2")  # Reduce storage fraction

spark = SparkSession.builder \
    .config(conf=conf) \
    .getOrCreate()

# Now, let's break down the data into much smaller chunks
df_size = extracted_derived_df.count()
target_size = 500  # KB per partition
num_partitions = max(df_size // 1000, (19674 // target_size) + 1)  # Calculate based on current task size

# Repartition and write in smaller chunks
(extracted_derived_df
 .repartition(num_partitions)
 .write
 .option("batchsize", 100)  # Smaller batch size
 .option("numPartitions", num_partitions)
 .jdbc(url=jdbc_url,
       table="derived_columns",
       mode="append",
       properties={**properties,
                  "rewriteBatchedStatements": "true",
                  "batchsize": "100",
                  "isolationLevel": "READ_COMMITTED"}))

In [None]:
# Convert to pandas and write in chunks
pandas_df = extracted_derived_df.toPandas()
chunk_size = 1000  # Adjust based on your memory constraints

from pyspark.sql import Row
from itertools import islice

def write_chunk(chunk_df):
    # Convert chunk back to spark dataframe
    spark_chunk = spark.createDataFrame(chunk_df.to_dict('records'))
    spark_chunk.write.jdbc(
        url=jdbc_url,
        table="derived_columns",
        mode="append",
        properties=properties
    )

# Process in chunks
for i in range(0, len(pandas_df), chunk_size):
    chunk = pandas_df.iloc[i:i + chunk_size]
    write_chunk(chunk)

In [None]:
# Convert to pandas and write in chunks
pandas_df = extracted_derived_df.toPandas()
chunk_size = 1000  # Adjust based on your memory constraints

from pyspark.sql import Row
from itertools import islice

def write_chunk(chunk_df):
    # Convert chunk back to spark dataframe
    spark_chunk = spark.createDataFrame(chunk_df.to_dict('records'))
    spark_chunk.write.jdbc(
        url=jdbc_url,
        table="derived_columns",
        mode="append",
        properties=properties
    )

# Process in chunks
for i in range(0, len(pandas_df), chunk_size):
    chunk = pandas_df.iloc[i:i + chunk_size]
    write_chunk(chunk)