In [1]:
import datetime
import os
import psycopg2
from pyspark.sql import (
    SparkSession,
    functions
)

# Start Spark session
spark = SparkSession.builder \
    .appName("ReadFromRDS") \
    .config("spark.jars", "/home/ollyhill/data-testing/mysql-connector-j-8.4.0.jar,/home/ollyhill/data-testing/postgresql-42.7.7.jar") \
    .getOrCreate()


# JDBC connection properties
jdbc_url = (
    "jdbc:mysql://" +
    os.environ["PORTAL_RDS"] +
    ":3306/" +
    os.environ["PORTAL_DB"]
)
connection_properties = {
    "user": os.environ['PORTAL_USER'],
    "password": os.environ['PORTAL_PASSWORD'],
    "driver": "com.mysql.cj.jdbc.Driver"
}

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/24 09:58:41 WARN Utils: Your hostname, OH-LAPTOP, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/07/24 09:58:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/07/24 09:58:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
relevant_dt = datetime.datetime(2025,6,15)

In [3]:
query = "(SELECT id, user_id, device_id, created_at FROM api_calls WHERE id >= 70003133) AS filtered"

df = spark.read.jdbc(
    url=jdbc_url,
    table=query,
    properties=connection_properties
)

In [4]:
min_ts = df.select(functions.min("created_at")).collect()[0][0]
assert min_ts < relevant_dt, "Not enough rows looked at"

                                                                                

In [5]:
df = df \
    .filter(
        (functions.col("user_id").isNotNull()) &
        (functions.col("device_id").isNotNull()) &
        (functions.col("created_at").isNotNull()) &
        (functions.col("created_at") >= functions.lit(relevant_dt.isoformat()))
    )

df = df \
    .withColumn("created_date", functions.to_date("created_at"))  \
    .dropDuplicates(["user_id", "device_id", "created_date"])  \
    .select(["user_id", "device_id", "created_date"])

In [6]:
postgres_schema="public"
postgres_table="aggregated_api_calls"

In [7]:
type_map = {
    "string": "TEXT",
    "long": "BIGINT",
    "timestamp": "TIMESTAMP",
    "date": "DATE",
}

In [8]:
query = f"DROP TABLE IF EXISTS {postgres_schema}.{postgres_table};"
query += f"CREATE TABLE {postgres_schema}.{postgres_table} ("
query += ",".join([
    (
        "\n" +
        field.name +
        " " +
        type_map[field.dataType.typeName()]
    )
    for field
    in df.schema
])
query += ");"

In [9]:

conn = psycopg2.connect(
    dbname=os.environ["POSTGRES_DB"],
    user=os.environ["POSTGRES_USER"],
    password=os.environ["POSTGRES_PASSWORD"],
    host=os.environ["POSTGRES_HOST"],
    port=os.environ["POSTGRES_PORT"]
)

cur = conn.cursor()
cur.execute(query)
conn.commit()
cur.close()
conn.close()


In [None]:


postgres_url = (
    "jdbc:postgresql://" +
    os.environ["POSTGRES_HOST"] +
    ":" +
    os.environ["POSTGRES_PORT"] +
    "/" +
    os.environ["POSTGRES_DB"]
)

df.write \
    .format("jdbc") \
    .option("url", postgres_url) \
    .option("dbtable", f"{postgres_schema}.{postgres_table}") \
    .option("user", os.environ["POSTGRES_USER"]) \
    .option("password", os.environ["POSTGRES_PASSWORD"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

                                                                                

25/07/24 11:58:22 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 209707 ms exceeds timeout 120000 ms
25/07/24 11:58:22 WARN SparkContext: Killing executors is not supported by current scheduler.
25/07/24 11:58:27 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$