In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, regexp_replace, lower, split, array_join, date_format
from pyspark.sql.types import StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
import re
import logging

In [24]:
# Initialize logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [51]:
# Path to your PostgreSQL JDBC driver
jdbc_driver_path = r"C:\Users\jyuba\postgresql-42.7.2.jar"

In [52]:
# Initialize Spark session
try:
    spark = SparkSession.builder \
        .appName("Postgres to Spark DataFrame") \
        .config("spark.jars", jdbc_driver_path) \
        .getOrCreate()
    logger.info("Spark Session created successfully.")
except Exception as e:
    logger.error(f"Error creating Spark Session: {e}")
    raise

INFO:__main__:Spark Session created successfully.


In [53]:
# PostgreSQL connection properties
jdbc_url = "jdbc:postgresql://localhost:5432/ainbox"  # Update with your actual JDBC URL
table = "emails"  # Update with your actual table name
properties = {
    "user": "postgres",
    "password": "XXXXXXXXXX",
    "driver": "org.postgresql.Driver"
}

In [54]:
# Read data from PostgreSQL table into a Spark DataFrame
df = spark.read.jdbc(url=jdbc_url, table="emails", properties=properties)

In [55]:
df.show()

+----+--------+--------------------+--------------------+--------------------+---+---+----------+--------------------+--------------------+
|  id|email_id|             subject|              sender|          recipients| cc|bcc|      date|          body_plain|               links|
+----+--------+--------------------+--------------------+--------------------+---+---+----------+--------------------+--------------------+
|NULL|       1|Take the next ste...|Google Community ...|[jyubaeng@gmail.com]| []| []|2023-01-12|Let's get started...|[https://notifica...|
|NULL|       2|Jubyung, take the...|The Google Accoun...|[jyubaeng@gmail.com]| []| []|2023-01-17|Hi Jubyung,\r\n\r...|[https://notifica...|
|NULL|       3|      Security alert|Google <no-reply@...|[jyubaeng@gmail.com]| []| []|2023-01-17|[image: Google]\r...|[https://accounts...|
|NULL|       4|Recovery email ve...|Google <no-reply@...|[jyubaeng@gmail.com]| []| []|2023-01-17|[image: Google]\r...|[https://accounts...|
|NULL|       5|     

In [56]:
# Handling missing values
df = df.fillna({'subject': '', 'body_plain': ''})

In [57]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, regexp_replace, trim, to_date, hour, dayofweek, expr
from pyspark.ml.feature import Tokenizer

# Clean 'subject' column
emails_df = df.withColumn("subject_clean",
                        trim(
                            regexp_replace(
                                col("subject"),
                                r"\W", " "
                            )
                        )
                    )

# Tokenize 'subject' column
tokenizer = Tokenizer(inputCol="subject_clean", outputCol="subject_tokens")
emails_df = tokenizer.transform(emails_df)

# Extract domain from 'sender' column
emails_df = emails_df.withColumn("sender_domain",
                                 trim(
                                     regexp_replace(
                                         col("sender"),
                                         r".*@",
                                         ""
                                     )
                                 )
                                )

# Explode 'recipients', 'cc', 'bcc' arrays
emails_df = emails_df.withColumn("recipient", explode("recipients"))
emails_df = emails_df.withColumn("cc_recipient", explode("cc"))
emails_df = emails_df.withColumn("bcc_recipient", explode("bcc"))

# Extract domain from 'recipient', 'cc_recipient', 'bcc_recipient' columns
emails_df = emails_df.withColumn("recipient_domain",
                                 trim(
                                     regexp_replace(
                                         col("recipient"),
                                         r".*@",
                                         ""
                                     )
                                 )
                                )
emails_df = emails_df.withColumn("cc_recipient_domain",
                                 trim(
                                     regexp_replace(
                                         col("cc_recipient"),
                                         r".*@",
                                         ""
                                     )
                                 )
                                )
emails_df = emails_df.withColumn("bcc_recipient_domain",
                                 trim(
                                     regexp_replace(
                                         col("bcc_recipient"),
                                         r".*@",
                                         ""
                                     )
                                 )
                                )

# Convert 'date' to date type and extract additional features
emails_df = emails_df.withColumn("date", to_date(col("date")))
emails_df = emails_df.withColumn("hour", hour(col("date")))
emails_df = emails_df.withColumn("day_of_week", dayofweek(col("date")))

# Clean 'body_plain' column
emails_df = emails_df.withColumn("body_clean",
                                 trim(
                                     regexp_replace(
                                         col("body_plain"),
                                         r"<[^>]+>", ""  # Remove HTML tags
                                     )
                                 )
                                )

# Tokenize 'body_plain' column
tokenizer = Tokenizer(inputCol="body_clean", outputCol="body_tokens")
emails_df = tokenizer.transform(emails_df)

# Extract domain from 'links' array
emails_df = emails_df.withColumn("link_domain",
                                 explode(
                                     expr("filter(links, x -> regexp_extract(x, 'https?://([a-zA-Z0-9.-]+)', 1) is not null)")
                                 )
                                )

# Stop the Spark session
spark.stop()


+---+--------+-------+------+----------+---+---+----+----------+-----+-------------+--------------+-------------+---------+------------+-------------+----------------+-------------------+--------------------+----+-----------+----------+-----------+-----------+
| id|email_id|subject|sender|recipients| cc|bcc|date|body_plain|links|subject_clean|subject_tokens|sender_domain|recipient|cc_recipient|bcc_recipient|recipient_domain|cc_recipient_domain|bcc_recipient_domain|hour|day_of_week|body_clean|body_tokens|link_domain|
+---+--------+-------+------+----------+---+---+----+----------+-----+-------------+--------------+-------------+---------+------------+-------------+----------------+-------------------+--------------------+----+-----------+----------+-----------+-----------+
+---+--------+-------+------+----------+---+---+----+----------+-----+-------------+--------------+-------------+---------+------------+-------------+----------------+-------------------+--------------------+----+----

In [58]:
# Write DataFrame to PostgreSQL table
emails_df.write.jdbc(url=jdbc_url, table="emails_cleaned", mode="overwrite", properties=properties)

Py4JJavaError: An error occurred while calling o527.jdbc.
: java.lang.IllegalStateException: SparkContext has been shutdown
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2390)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1039)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1037)
	at org.apache.spark.sql.Dataset.$anonfun$foreachPartition$1(Dataset.scala:3514)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.Dataset.$anonfun$withNewRDDExecutionId$1(Dataset.scala:4309)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:4307)
	at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:3514)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:901)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:65)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:756)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1570)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, TimestampType
from pyspark.sql.functions import col, explode, regexp_replace, trim, to_date, hour, dayofweek, expr
from pyspark.ml.feature import Tokenizer

# Initialize Spark session
spark = SparkSession.builder \
    .appName("EmailDataStreaming") \
    .getOrCreate()

# PostgreSQL connection properties
jdbc_url = "jdbc:postgresql://localhost:5432/ainbox"  # Update with your actual JDBC URL
table = "emails"  # Update with your actual table name
properties = {
    "user": "postgres",
    "password": "Habang3233",
    "driver": "org.postgresql.Driver"
}

# Define schema for emails table
schema = StructType() \
    .add("id", StringType()) \
    .add("email_id", StringType()) \
    .add("subject", StringType()) \
    .add("sender", StringType()) \
    .add("recipients", StringType()) \
    .add("cc", StringType()) \
    .add("bcc", StringType()) \
    .add("date", TimestampType()) \
    .add("body_plain", StringType()) \
    .add("links", StringType())

# Read data from PostgreSQL table as a streaming DataFrame and apply data processing
streaming_df = spark.readStream \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", table) \
    .option("user", properties["user"]) \
    .option("password", properties["password"]) \
    .option("driver", properties["driver"]) \
    .schema(schema) \
    .load() \
    .withColumn("subject_clean", trim(regexp_replace(col("subject"), r"\W", " "))) \
    .withColumn("sender_domain", trim(regexp_replace(col("sender"), r".*@",""))) \
    .withColumn("recipient_domain", trim(regexp_replace(col("recipients"), r".*@",""))) \
    .withColumn("cc_recipient_domain", trim(regexp_replace(col("cc"), r".*@",""))) \
    .withColumn("bcc_recipient_domain", trim(regexp_replace(col("bcc"), r".*@",""))) \
    .withColumn("date", to_date(col("date"))) \
    .withColumn("hour", hour(col("date"))) \
    .withColumn("day_of_week", dayofweek(col("date"))) \
    .withColumn("body_clean", trim(regexp_replace(col("body_plain"), r"<[^>]+>", ""))) \
    .withColumn("link_domain", explode(expr("filter(links, x -> regexp_extract(x, 'https?://([a-zA-Z0-9.-]+)', 1) is not null)")))

# Define data processing and insert back into the same table
query = streaming_df.writeStream \
    .foreachBatch(lambda batch_df, batch_id: batch_df.write.jdbc(url=jdbc_url, table="emails", mode="append", properties=properties)) \
    .outputMode("append") \
    .start()

# Wait for the termination of the query
query.awaitTermination()