In [1]:
#! pip install sqlalchemy
#! pip install psycopg2-binary

In [1]:
import itertools
import pandas as pd
from pyspark.sql import SparkSession
from sqlalchemy import create_engine


In [2]:

USER = "postgres"
PASSWORD = "postgres"
HOST = "localhost"
PORT = 5432
DB = "sales_data"

In [3]:
engine = create_engine(f'postgresql://{USER}:{PASSWORD}@{HOST}:{PORT}/{DB}')
# Test Connection
engine .connect()

<sqlalchemy.engine.base.Connection at 0x7ad853f12840>

In [5]:
spark = SparkSession.builder \
        .config("spark.driver.memory", "12g") \
        .appName("ParquetToPostgres") \
        .getOrCreate()

25/04/14 12:09:29 WARN Utils: Your hostname, codespaces-240bfe resolves to a loopback address: 127.0.0.1; using 10.0.11.180 instead (on interface eth0)
25/04/14 12:09:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/14 12:09:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
def chunk_iterator(iterator, chunk_size):
    """
    Yield successive chunks of a given size from an iterator.
    """
    while True:
        # Use itertools.islice to grab chunk_size elements from the iterator.
        chunk = list(itertools.islice(iterator, chunk_size))
        if not chunk:
            break
        yield chunk


In [11]:

def load_to_postgres_in_chunks(spark_df, table_name, engine=engine, chunk_size=100000):
    """
    Write data from a Spark DataFrame to a PostgreSQL table in chunks.
    """
    total_rows = 0
    try:
        # Create an iterator over the DataFrame's rows.
        iterator = spark_df.toLocalIterator()
        for chunk_index, chunk in enumerate(chunk_iterator(iterator, chunk_size), start=1):
            # Convert each Row object to a dictionary, then to a Pandas DataFrame.
            chunk_dicts = [row.asDict() for row in chunk]
            pdf = pd.DataFrame(chunk_dicts)
            pdf.to_sql(name=table_name, con=engine, if_exists="append", index=False)
            total_rows += len(pdf)
            print(f"Inserted chunk {chunk_index} with {len(pdf)} rows; Total inserted rows: {total_rows}")
        print(f"Successfully wrote data to PostgreSQL table '{table_name}'.")
    except Exception as e:
        print(f"Error processing table {table_name}: {e}")

In [8]:
def load_to_postgres(df_spark, table_name):    
    try:
        # Convert Spark DF to Pandas before writing via SQLAlchemy
        df_pandas = df_spark.toPandas()

        # Load to Postgres; replace table if it exists (or use 'append', etc.)
        df_pandas.to_sql(table_name, engine, if_exists="replace", index=False)
        print(f"Successfully wrote data to PostgreSQL table '{table_name}'.")

    except Exception as e:
        print(f"Error processing table {table_name}: {e}")

In [9]:
tables = [
    "categories",
    "cities",
    "countries",
    "customers",
    "employees",
    "products",
    "sales"
    ]

In [None]:
for table in tables:
    print(f"Loading {table} data into PostgreSQL.")
    parquet_path = f"../data/datalake/{table}/*.parquet"
    df_spark = spark.read.parquet(parquet_path)
    print(f"Loading data from {parquet_path} with {df_spark.count()} records.")
    if df_spark.count()> 100000:
        load_to_postgres_in_chunks(df_spark, table)
    else:
        load_to_postgres(df_spark, table)
    

Loading categories data into PostgreSQL.


Loading data from ../../data/datalake/categories/*.parquet with 11 records.
Successfully wrote data to PostgreSQL table 'categories'.
Loading cities data into PostgreSQL.
Loading data from ../../data/datalake/cities/*.parquet with 96 records.
Successfully wrote data to PostgreSQL table 'cities'.
Loading countries data into PostgreSQL.
Loading data from ../../data/datalake/countries/*.parquet with 206 records.
Successfully wrote data to PostgreSQL table 'countries'.
Loading customers data into PostgreSQL.
Loading data from ../../data/datalake/customers/*.parquet with 98759 records.
Successfully wrote data to PostgreSQL table 'customers'.
Loading employees data into PostgreSQL.
Loading data from ../../data/datalake/employees/*.parquet with 23 records.
Successfully wrote data to PostgreSQL table 'employees'.
Loading products data into PostgreSQL.
Loading data from ../../data/datalake/products/*.parquet with 452 records.
Successfully wrote data to PostgreSQL table 'products'.
Loading sales

25/04/14 12:09:44 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Inserted chunk 1 with 100000 rows; Total inserted rows: 100000
Inserted chunk 2 with 100000 rows; Total inserted rows: 200000
Inserted chunk 3 with 100000 rows; Total inserted rows: 300000
Inserted chunk 4 with 100000 rows; Total inserted rows: 400000
Inserted chunk 5 with 100000 rows; Total inserted rows: 500000
Inserted chunk 6 with 100000 rows; Total inserted rows: 600000
Inserted chunk 7 with 100000 rows; Total inserted rows: 700000
Inserted chunk 8 with 100000 rows; Total inserted rows: 800000
Inserted chunk 9 with 100000 rows; Total inserted rows: 900000
Inserted chunk 10 with 100000 rows; Total inserted rows: 1000000
Inserted chunk 11 with 100000 rows; Total inserted rows: 1100000
Inserted chunk 12 with 100000 rows; Total inserted rows: 1200000
Inserted chunk 13 with 100000 rows; Total inserted rows: 1300000
Inserted chunk 14 with 100000 rows; Total inserted rows: 1400000
Inserted chunk 15 with 100000 rows; Total inserted rows: 1500000
Inserted chunk 16 with 100000 rows; Total i

[Stage 56:>                                                         (0 + 1) / 1]

Inserted chunk 18 with 100000 rows; Total inserted rows: 1800000
Inserted chunk 19 with 100000 rows; Total inserted rows: 1900000
Inserted chunk 20 with 100000 rows; Total inserted rows: 2000000
Inserted chunk 21 with 100000 rows; Total inserted rows: 2100000
Inserted chunk 22 with 100000 rows; Total inserted rows: 2200000
Inserted chunk 23 with 100000 rows; Total inserted rows: 2300000
Inserted chunk 24 with 100000 rows; Total inserted rows: 2400000
Inserted chunk 25 with 100000 rows; Total inserted rows: 2500000
Inserted chunk 26 with 100000 rows; Total inserted rows: 2600000
Inserted chunk 27 with 100000 rows; Total inserted rows: 2700000
Inserted chunk 28 with 100000 rows; Total inserted rows: 2800000
Inserted chunk 29 with 100000 rows; Total inserted rows: 2900000
Inserted chunk 30 with 100000 rows; Total inserted rows: 3000000
Inserted chunk 31 with 100000 rows; Total inserted rows: 3100000
Inserted chunk 32 with 100000 rows; Total inserted rows: 3200000
Inserted chunk 33 with 10

[Stage 57:>                                                         (0 + 1) / 1]

Inserted chunk 36 with 100000 rows; Total inserted rows: 3600000
Inserted chunk 37 with 100000 rows; Total inserted rows: 3700000
Inserted chunk 38 with 100000 rows; Total inserted rows: 3800000
Inserted chunk 39 with 100000 rows; Total inserted rows: 3900000
Inserted chunk 40 with 100000 rows; Total inserted rows: 4000000
Inserted chunk 41 with 100000 rows; Total inserted rows: 4100000
Inserted chunk 42 with 100000 rows; Total inserted rows: 4200000
Inserted chunk 43 with 100000 rows; Total inserted rows: 4300000
Inserted chunk 44 with 100000 rows; Total inserted rows: 4400000
Inserted chunk 45 with 100000 rows; Total inserted rows: 4500000
Inserted chunk 46 with 100000 rows; Total inserted rows: 4600000
Inserted chunk 47 with 100000 rows; Total inserted rows: 4700000
Inserted chunk 48 with 100000 rows; Total inserted rows: 4800000
Inserted chunk 49 with 100000 rows; Total inserted rows: 4900000
Inserted chunk 50 with 100000 rows; Total inserted rows: 5000000
Inserted chunk 51 with 10

[Stage 58:>                                                         (0 + 1) / 1]

Inserted chunk 53 with 100000 rows; Total inserted rows: 5300000
Inserted chunk 54 with 100000 rows; Total inserted rows: 5400000
Inserted chunk 55 with 100000 rows; Total inserted rows: 5500000
Inserted chunk 56 with 100000 rows; Total inserted rows: 5600000
Inserted chunk 57 with 100000 rows; Total inserted rows: 5700000
Inserted chunk 58 with 100000 rows; Total inserted rows: 5800000
Inserted chunk 59 with 100000 rows; Total inserted rows: 5900000
Inserted chunk 60 with 100000 rows; Total inserted rows: 6000000
Inserted chunk 61 with 100000 rows; Total inserted rows: 6100000
Inserted chunk 62 with 100000 rows; Total inserted rows: 6200000
Inserted chunk 63 with 100000 rows; Total inserted rows: 6300000
Inserted chunk 64 with 100000 rows; Total inserted rows: 6400000
Inserted chunk 65 with 100000 rows; Total inserted rows: 6500000
Inserted chunk 66 with 100000 rows; Total inserted rows: 6600000
Inserted chunk 67 with 100000 rows; Total inserted rows: 6700000
Inserted chunk 68 with 58