In [1]:
%pip install pyspark



Note: you may need to restart the kernel to use updated packages.


In [2]:
%pip install psycopg2-binary


Note: you may need to restart the kernel to use updated packages.


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean
from sqlalchemy import create_engine
from pyspark.sql.functions import monotonically_increasing_id
import pandas as pd
import os
import psycopg2

In [4]:
# set java home
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk-11'

In [39]:
spark = SparkSession.builder.appName('DataExplore').getOrCreate()

In [7]:
spark

In [8]:
nugabank_df = spark.read.csv(r'C:\Users\PAMELA\Desktop\10alytics_notebooks\pysparkendtoend\dataset\nuga_bank_transactions.csv', header = True, inferSchema= True)

In [9]:
nugabank_df.show()

+--------------------+------+----------------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type|    Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------

In [10]:
nugabank_df.describe().show()

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+------+---------+--------------------+------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|        Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|     Random_Number|Category| Group|Is_Active|         Description|Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+------+---------+---------

In [11]:
nugabank_df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

In [12]:
for column in nugabank_df.columns:
    print(column, 'Nulls', nugabank_df.filter(nugabank_df[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 100425
Customer_Address Nulls 100087
Customer_City Nulls 100034
Customer_State Nulls 100009
Customer_Country Nulls 100672
Company Nulls 100295
Job_Title Nulls 99924
Email Nulls 100043
Phone_Number Nulls 100524
Credit_Card_Number Nulls 100085
IBAN Nulls 100300
Currency_Code Nulls 99342
Random_Number Nulls 99913
Category Nulls 100332
Group Nulls 100209
Is_Active Nulls 100259
Last_Updated Nulls 100321
Description Nulls 100403
Gender Nulls 99767
Marital_Status Nulls 99904


In [13]:
num_rows = nugabank_df.count()
num_rows

1000000

In [14]:
num_columns = len(nugabank_df.columns)
num_columns

23

In [15]:
nugabank_df = nugabank_df.fillna({
    'Customer_Name' : 'unknown',
    'Customer_Address' : 'unknown',
    'Customer_City': 'unknown',
    'Customer_State': 'unknown',
    'Customer_Country': 'unknown',
    'Company': 'unknown',
    'Job_Title': 'unknown',
    'Email': 'unknown',
    'Phone_Number': 'unknown',
    'Credit_Card_Number': 0,
    'IBAN':'unknown',
    'Currency_Code': 'unknown',
    'Random_Number' :'0.0',
    'Category': 'unknown',
    'Group' :'unknown',
    'Is_Active': 'unknown',
    'Description': 'unknown',
    'Gender': 'unknown',
    'Marital_Status' :'unknown'
})

In [16]:
for column in nugabank_df.columns:
    print(column, 'Nulls', nugabank_df.filter(nugabank_df[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0
Last_Updated Nulls 100321
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [17]:
nugabank_clean_df = nugabank_df.na.drop(subset=['last_Updated'])

In [18]:
for column in nugabank_clean_df.columns:
    print(column, 'Nulls', nugabank_clean_df.filter(nugabank_clean_df[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0
Last_Updated Nulls 0
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [19]:
# viw summary statistics
nugabank_clean_df.describe().show()

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+-----------------+--------+-------+---------+--------------------+-------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|     Company|         Job_Title|              Email|        Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|    Random_Number|Category|  Group|Is_Active|         Description| Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+-----------------+--------+-------+---------+----------

In [20]:
nugabank_clean_df.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [21]:
transaction_df = nugabank_clean_df.select('Transaction_Date', 'Amount','Transaction_Type')
transaction_df.show()

+--------------------+------+----------------+
|    Transaction_Date|Amount|Transaction_Type|
+--------------------+------+----------------+
|2024-03-23 15:38:...| 34.76|      Withdrawal|
|2024-04-22 19:15:...|163.92|      Withdrawal|
|2024-04-12 19:46:...|386.32|      Withdrawal|
|2024-04-17 15:29:...|407.15|         Deposit|
|2024-02-10 01:51:...|161.31|         Deposit|
|2024-02-10 22:56:...|764.34|        Transfer|
|2024-04-07 00:07:...|734.59|         Deposit|
|2024-03-08 01:51:...|592.43|         Deposit|
|2024-02-01 12:34:...| 927.1|         Deposit|
|2024-03-22 16:46:...| 66.59|        Transfer|
|2024-04-23 13:30:...| 246.3|      Withdrawal|
|2024-01-13 01:22:...|782.32|      Withdrawal|
|2024-02-25 15:16:...|818.42|      Withdrawal|
|2024-01-01 20:55:...|352.23|      Withdrawal|
|2024-01-19 00:01:...|316.19|      Withdrawal|
|2024-04-09 14:40:...|662.26|      Withdrawal|
|2024-04-15 04:58:...|893.73|         Deposit|
|2024-04-12 14:32:...|746.22|      Withdrawal|
|2024-02-26 1

In [22]:
# ading transaction_id column
transaction_df = transaction_df.withColumn('transaction_id', monotonically_increasing_id())

In [23]:
# reordering columns
transaction_df = transaction_df.select('transaction_id', 'Transaction_Date', 'Amount','Transaction_Type')

In [24]:
transaction_df.show()

+--------------+--------------------+------+----------------+
|transaction_id|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
|             3|2024-04-17 15:29:...|407.15|         Deposit|
|             4|2024-02-10 01:51:...|161.31|         Deposit|
|             5|2024-02-10 22:56:...|764.34|        Transfer|
|             6|2024-04-07 00:07:...|734.59|         Deposit|
|             7|2024-03-08 01:51:...|592.43|         Deposit|
|             8|2024-02-01 12:34:...| 927.1|         Deposit|
|             9|2024-03-22 16:46:...| 66.59|        Transfer|
|            10|2024-04-23 13:30:...| 246.3|      Withdrawal|
|            11|2024-01-13 01:22:...|782.32|      Withdrawal|
|            12|2024-02-25 15:16:...|818.42|      Withdrawal|
|       

In [25]:
# customer table
customer_df = nugabank_clean_df.select('Customer_Name', 'Customer_Address', 'Customer_City', \
                         'Customer_State', 'Customer_Country','Email','Phone_Number') \
                     .withColumn('customer_id', monotonically_increasing_id()) \
                     .select('customer_id', 'Customer_Name', 'Customer_Address', 'Customer_City', \
                         'Customer_State', 'Customer_Country','Email','Phone_Number')
customer_df.show()

+-----------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+-------------------+
|customer_id|    Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|               Email|       Phone_Number|
+-----------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+-------------------+
|          0|       James Neal|54912 Holmes Lodg...|   West Keithborough|       Florida|                Togo|             unknown|  493.720.6609x7545|
|          1|      Thomas Long| 1133 Collin Passage|          Joshuabury|   Connecticut|Lao People's Demo...|michellelynch@exa...|      (497)554-3317|
|          2|   Ashley Shelton|5297 Johnson Port...|         North Maria|    New Jersey|              Bhutan| ljordan@example.org|      (534)769-3072|
|          3|    James Rosario|56955 Moore Glens...|  North Michellefurt|    New Mexico|      

In [26]:
customer_df = nugabank_clean_df.select('Customer_Name', 'Customer_Address', 'Customer_City', \
                         'Customer_State', 'Customer_Country','Email','Phone_Number').distinct() \
                     .withColumn('customer_id', monotonically_increasing_id()) \
                     .select('customer_id', 'Customer_Name', 'Customer_Address', 'Customer_City', \
                         'Customer_State', 'Customer_Country','Email','Phone_Number')
customer_df.show()

+-----------+------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|customer_id|     Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|               Email|        Phone_Number|
+-----------+------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|          0|    Allen Castillo|   5750 Vanessa Neck|     New Vickiemouth|North Carolina|              Zambia|             unknown|  732.974.7438x89666|
|          1|        Tina Jones|28150 Kelsey Stat...|             unknown|          Iowa|               Qatar|gabriellemoore@ex...|    736.645.3977x275|
|          2|    Michael Murphy|894 Williams Ridg...|       Dominguezview|      New York|              Sweden|kristinstanley@ex...|+1-693-739-2204x8851|
|          3|       Brian Glenn|505 Mcdowell Gard...|South Christinech...|  South 

In [27]:
customer_df.count()

899679

In [28]:
# employee table
employee_df = nugabank_clean_df.select('Company', 'Job_Title', 'Email', 'Phone_Number', \
                     'Gender', 'Marital_Status') \
                     .withColumn('employee_id', monotonically_increasing_id()) \
                     .select('employee_id', 'Company', 'Job_Title', 'Email', 'Phone_Number', \
                     'Gender', 'Marital_Status')
employee_df.show()

+-----------+--------------------+--------------------+--------------------+-------------------+-------+--------------+
|employee_id|             Company|           Job_Title|               Email|       Phone_Number| Gender|Marital_Status|
+-----------+--------------------+--------------------+--------------------+-------------------+-------+--------------+
|          0|Benson, Johnson a...|             unknown|             unknown|  493.720.6609x7545|  Other|      Divorced|
|          1|             unknown|   Food technologist|michellelynch@exa...|      (497)554-3317| Female|       Married|
|          2|       Jones-Mueller|Database administ...| ljordan@example.org|      (534)769-3072|  Other|       unknown|
|          3|       Vargas-Harris|Horticultural the...|parkerjames@examp...|+1-447-900-1320x257|unknown|       unknown|
|          4|Richardson, Gonza...|   Minerals surveyor| zweaver@example.net|            unknown| Female|       Married|
|          5|           Smith Ltd| Seism

In [29]:
# fact table
fact_table = nugabank_clean_df.join(transaction_df, ['Transaction_Date', 'Amount', 'Transaction_Type'], 'inner') \
                     .join(customer_df, ['Customer_Name', 'Customer_Address', 'Customer_City', \
                         'Customer_State', 'Customer_Country'], 'inner') \
                     .join(employee_df, ['Company', 'Job_Title', 'Email', 'Phone_Number', \
                         'Gender', 'Marital_Status'], 'inner') \
                     .select('transaction_id', 'customer_id', 'employee_id', 'Credit_Card_Number',
                         'IBAN', 'Currency_Code', 'Random_Number','Category', 'Group', \
                         'Is_Active', 'Last_Updated', 'Description')
fact_table.show()

+--------------+-----------+-----------+-------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|transaction_id|customer_id|employee_id| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|  Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+-------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|   17179987341|17179981666|17179987341|     30233184463165|GB82ZFSS908131813...|          PAB|       6515.0|       B|unknown|      Yes|2021-12-12 16:34:...|Without shake mom...|
|   17179896094|     166686|17179896094|      4193772209051|             unknown|          AMD|       3869.0|       B|      Y|      Yes|2021-02-07 00:32:...|Defense billion c...|
|   17179996613|25769958116|17179996613|      4155644689430|GB84PDNR042799442...|          ALL|       178

In [30]:
# fact table
fact_table = nugabank_clean_df.join(transaction_df, ['Transaction_Date', 'Amount', 'Transaction_Type'], 'left') \
                     .join(customer_df, ['Customer_Name', 'Customer_Address', 'Customer_City', \
                         'Customer_State', 'Customer_Country'], 'left') \
                     .join(employee_df, ['Company', 'Job_Title', 'Email', 'Phone_Number', \
                         'Gender', 'Marital_Status'], 'left') \
                     .select('transaction_id', 'customer_id', 'employee_id', 'Credit_Card_Number',
                         'IBAN', 'Currency_Code', 'Random_Number','Category', 'Group', \
                         'Is_Active', 'Last_Updated', 'Description')
fact_table.show()

+--------------+-----------+-----------+-------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|transaction_id|customer_id|employee_id| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|  Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+-------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|            16|      32038|         16|   3564854271916761|GB58VVHN678787830...|          AED|       2546.0|       B|      Y|      Yes|2021-09-14 07:15:...|Letter hand soldi...|
|             9| 8589953692|          9|    213171934267032|GB05MHXA361278613...|          BAM|       3958.0| unknown|unknown|      Yes|2022-09-19 03:52:...|He wish soldier t...|
|   17179869192|     150742|17179869192|   4000677407528931|GB08CDPI833234711...|          KES|       161

In [None]:
#output the transformed data to parquet
transaction_df.write.mode('overwrite').parquet(r'dataset/transaction')
customer_df.write.mode('overwrite').parquet(r'dataset/customer')
employee_df.write.mode('overwrite').parquet(r'dataset/employee')
fact_table.write.mode('overwrite').parquet(r'dataset/fact_table')

In [None]:
#output the transformed data to csv
transaction_df.repartition(3).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/transaction')
customer_df.repartition(3).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/customer')
employee_df.repartition(3).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/employee')
fact_table.repartition(3).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/fact_table')

In [32]:
# convert spark to pandas df
transaction_pd_df = transaction_df.toPandas()
customer_pd_df = customer_df.toPandas()
employee_pd_df = employee_df.toPandas()
fact_table_pd_df = fact_table.toPandas()

In [34]:
#loading the dataset into postgresql DB


# Define database connection parameters
db_params = {
    'username': 'postgres',
    'password': 'root1986%40',
    'host': 'localhost',
    'port': '5432',
    'database': 'nuga_bank'
    
}

# Define the database connection URL with db parameters
db_url = f"postgresql://{db_params['username']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}"

# Create the database engine with db URL
engine = create_engine(db_url)


# connect to postgresql server

# Connection is automatically closed when the with block exits

with engine.connect() as connection:
  # create tables and load the data
  transaction_pd_df.to_sql('transaction_df', connection, if_exists = 'replace', index = False)
  customer_pd_df.to_sql('customer_df', connection, if_exists = 'replace', index = False)
  employee_pd_df.to_sql('employee_df', connection, if_exists = 'replace', index = False )
  fact_table_pd_df.to_sql('fact_table_pd_df', connection, if_exists = 'replace', index = False)

print('database table loaded succesfully')


database table loaded succesfully


In [55]:
# Develop Functions to Get Database Connection
def get_db_connection():
    connection = psycopg2.connect(
        host='localhost',
        database='nuga_bank',
        user='postgres',
        password='root1986@'
    )
    return connection

# Connect to SQL Database
conn = get_db_connection()

In [58]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("nuga_bank") \
    .config("spark.jars", "C:\\jdbc\\postgresql-42.7.3.jar") \
    .getOrCreate()

In [59]:




# Connect to SQL Database
conn = get_db_connection()
# Create a function to create tables
def create_table():
    conn = get_db_connection()
    cursor = conn.cursor()
    create_table_query = '''
                        DROP TABLE IF EXISTS customer;
                        DROP TABLE IF EXISTS transaction;
                        DROP TABLE IF EXISTS employee;
                        DROP TABLE IF EXISTS fact_table;

                        CREATE TABLE customer (
                            customer_id BIGINT,
                            Customer_Name VARCHAR(10000),
                            Customer_Address VARCHAR(10000),
                            Customer_City VARCHAR(10000),
                            Customer_State VARCHAR(10000),
                            Customer_Country VARCHAR(10000)
                        );
                        
                        CREATE TABLE transaction (
                            transaction_id BIGINT,
                            Transaction_Date DATE,
                            Amount FLOAT,
                            Transaction_Type VARCHAR(10000)
                        );

                        CREATE TABLE employee (
                            employee_id BIGINT,
                            Company VARCHAR(10000),
                            Job_Title VARCHAR(10000),
                            Email VARCHAR(10000),
                            Phone_Number VARCHAR(10000),
                            Gender VARCHAR(10000),
                            Marital_Status VARCHAR(10000)
                        );

                        CREATE TABLE fact_table (
                            transaction_id BIGINT,
                            customer_id BIGINT,
                            employee_id BIGINT,
                            Credit_Card_Number BIGINT,
                            IBAN VARCHAR(10000),
                            Currency_Code VARCHAR(10000),
                            Random_Number FLOAT,
                            Category VARCHAR(10000),
                            "Group" VARCHAR(10000),
                            Is_Active VARCHAR(10000),
                            Last_Updated DATE,
                            Description VARCHAR(10000)
                        );
                        '''
    cursor.execute(create_table_query)
    conn.commit()
    cursor.close()
    conn.close()
create_table()
url = "jdbc:postgresql://localhost:5432/nuga_bank"
properties = {
    "user" : "postgres",
    "password" : "root1986@",
    "driver" : "org.postgresql.Driver"
}

customer_df.write.jdbc(url=url, table="customer", mode="append", properties=properties)
employee_df.write.jdbc(url=url, table="employee", mode="append", properties=properties)
transaction_df.write.jdbc(url=url, table="transaction", mode="append", properties=properties)
fact_table.write.jdbc(url=url, table="fact_table", mode="append", properties=properties)

Py4JJavaError: An error occurred while calling o438.jdbc.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:254)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:258)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:756)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [49]:
customer_pd_df.head()

Unnamed: 0,customer_id,Customer_Name,Customer_Address,Customer_City,Customer_State,Customer_Country,Email,Phone_Number
0,0,Allen Castillo,5750 Vanessa Neck,New Vickiemouth,North Carolina,Zambia,unknown,732.974.7438x89666
1,1,Tina Jones,28150 Kelsey Station Suite 560,unknown,Iowa,Qatar,gabriellemoore@example.net,736.645.3977x275
2,2,Michael Murphy,894 Williams Ridges Apt. 635,Dominguezview,New York,Sweden,kristinstanley@example.com,+1-693-739-2204x8851
3,3,Brian Glenn,505 Mcdowell Gardens Suite 851,South Christinechester,South Dakota,Lesotho,bcabrera@example.net,001-962-928-1897x95101
4,4,Jennifer Liu,07805 Taylor Locks Suite 278,South Lisa,Wyoming,unknown,fergusonjustin@example.com,688.888.9882x6972


In [None]:
from sqlalchemy import create_engine
import pandas as pd

# Define the database connection string
database_url = "postgresql+psycopg2://postgres:password@localhost/nuga_bank"

# Create a SQLAlchemy engine
engine = create_engine(database_url)

# Assuming customer_pd_df, employee_pd_df, transaction_pd_df, fact_table_pd_df are your pandas DataFrames

# Write the DataFrames to the SQL tables
customer_pd_df.to_sql('customer', engine, if_exists='append', index=False)
employee_pd_df.to_sql('employee', engine, if_exists='append', index=False)
transaction_pd_df.to_sql('transaction', engine, if_exists='append', index=False)
fact_table_pd_df.to_sql('fact_table', engine, if_exists='append', index=False)


In [None]:
nugabank_df = nugabank_df.withColumn('TotalCost', col)

In [None]:
filtered_nugabank_df = nugabank_df.filter(col('Quantity')>5)
filtered_nugabank_df()

In [None]:
nugabank_df_id = nugabank_df.select('', '',)

In [None]:
nugabank_df.write.partitionBy('Transaction_Date').parquet('partitioned_data')

: 

In [None]:
new_nugabank_df = spark.read.parquet('partitioned_data')
new_nugabank_df.show()

In [None]:
# nugabank_df = ecom_df.withColumn('ClientName', col('Customer_'))