In [1]:
!apt-get install -y python3-asyncpg
!apt-get install -y python3-pandas

!apt-get install -y wget
!wget -nc https://jdbc.postgresql.org/download/postgresql-42.2.5.jar

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
python3-asyncpg is already the newest version (0.29.0-1build1).
0 upgraded, 0 newly installed, 0 to remove and 12 not upgraded.
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
python3-pandas is already the newest version (2.1.4+dfsg-7).
0 upgraded, 0 newly installed, 0 to remove and 12 not upgraded.
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
wget is already the newest version (1.21.4-1ubuntu4.1).
0 upgraded, 0 newly installed, 0 to remove and 12 not upgraded.
File ‘postgresql-42.2.5.jar’ already there; not retrieving.



In [2]:
import pyspark
print(pyspark.__version__)

3.5.5


In [10]:
import os
import requests
import time
import json
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import from_json, to_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType, MapType, StringType, IntegerType, TimestampType, DoubleType

DB_CONFIG = {
    "user": os.getenv("POSTGRES_USER", "admin"),
    "password": os.getenv("POSTGRES_PASSWORD", "admin"),
    "database": os.getenv("POSTGRES_DB", "tender"),
    "host": os.getenv("POSTGRES_HOST", "postgre"),
    "port": int(os.getenv("POSTGRES_PORT", 5432))
}

current_dir = Path.cwd()
jar_path = current_dir / "postgresql-42.2.5.jar"

if not jar_path.exists():
    print(f"PostgreSQL JDBC Driver not found at {jar_path}. Please download it from https://jdbc.postgresql.org/download.html")
    raise FileNotFoundError(f"PostgreSQL JDBC Driver not found: {jar_path}")
    
spark = SparkSession.builder \
                    .appName("REST_API_with_PySpark_DF") \
                    .config("spark.jars", jar_path) \
                    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

schema = StructType([
    StructField("id", StringType(), True),
    StructField("date", StringType(), True),
    StructField("deadline_date", StringType(), True),
    StructField("title", StringType(), True),
    StructField("category", StringType(), True),
    StructField("description", StringType(), True),
    StructField("phase", StringType(), True),
    StructField("place", StringType(), True),
    StructField("awarded_value", StringType(), True),
    StructField("awarded_currency", StringType(), True),
    StructField("purchaser_name", StringType(), True),
    StructField("awarded", ArrayType(StructType([
        StructField("date", StringType(), True),
        StructField("suppliers", ArrayType(StructType([ 
            StructField("name", StringType(), True),
            StructField("id", LongType(), True),
            StructField("slug", StringType(), True)
        ])), True),
        StructField("count", LongType(), True),
        StructField("offers_count", ArrayType(LongType()), True),
        StructField("suppliers_name", StringType(), True),
        StructField("value", LongType(), True)
    ])), True)
])

def get_tenders(page=1):
    url = f"https://tenders.guru/api/hu/tenders?page={page}"
    response = requests.get(url)

    try:
        if response.status_code == 200:
            return response.json()
    except Exception as e:
        print(f"Error fetching data: {e}")
        return None

def create_df(data, json_data, schema):
    """
    Create a DataFrame from the fetched data and select relevant columns.
    """
    df = spark.createDataFrame(data, schema=schema)

    # Flatten nested fields
    df = df.withColumn("awarded_date", F.col("awarded").getItem(0).getField("date")) \
           .withColumn("suppliers_name", F.col("awarded").getItem(0).getField("suppliers_name")) \
           .withColumn("count", F.col("awarded").getItem(0).getField("count")) \
           .withColumn("offers_count", F.col("awarded").getItem(0).getField("offers_count").getItem(0)) \
           .withColumn("request", F.lit(json.dumps(json_data)))

    # Cast
    df = df.withColumn("id", F.col("id").cast(IntegerType())) \
           .withColumn("date", F.col("date").cast(TimestampType())) \
           .withColumn("deadline_date", F.col("deadline_date").cast(TimestampType())) \
           .withColumn("title", F.col("title").cast(StringType())) \
           .withColumn("category", F.col("category").cast(StringType())) \
           .withColumn("description", F.col("description").cast(StringType())) \
           .withColumn("phase", F.col("phase").cast(StringType())) \
           .withColumn("place", F.col("place").cast(StringType())) \
           .withColumn("awarded_value", F.col("awarded_value").cast(DoubleType())) \
           .withColumn("awarded_currency", F.col("awarded_currency").cast(StringType())) \
           .withColumn("awarded_date", F.col("awarded_date").cast(TimestampType())) \
           .withColumn("suppliers_name", F.col("suppliers_name").cast(StringType())) \
           .withColumn("count", F.col("count").cast(IntegerType())) \
           .withColumn("offers_count", F.col("offers_count").cast(IntegerType())) \
           .withColumn("request_json", F.col("request").cast(StringType())) # original request json as str

    selected_columns = [
        "id", 
        "date", 
        "deadline_date", 
        "title", 
        "category", 
        "description", 
        "phase", 
        "place", 
        "awarded_value", 
        "awarded_currency",
        "awarded_date", 
        "suppliers_name",
        "count", 
        "offers_count",
        "request_json"
    ]
    
    return df.select(*selected_columns)

def insert_to_postgres(df, table_name):
    """
    Insert the DataFrame into PostgreSQL using JDBC.
    https://stackoverflow.com/questions/77279712/how-to-insert-json-string-from-spark-into-column-of-type-jsonb-in-postgres
    """
    df.write \
      .format("jdbc")\
      .option("url", f"jdbc:postgresql://{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}") \
      .option("driver", "org.postgresql.Driver") \
      .option("dbtable", "tenders") \
      .option("user", DB_CONFIG["user"]) \
      .option("password", DB_CONFIG["password"]) \
      .option("stringtype", "unspecified") \
      .mode("overwrite") \
      .save()

def main():
    page = 1
    continue_fetching = True

    while continue_fetching:
        json_data = get_tenders(page)

        if json_data is not None:
            data = json_data["data"]
            total_pages = json_data['page_count']

            df = create_df(data, json_data, schema)

            df.show(5)

            insert_to_postgres(df, "tenders")

            if page >= total_pages:
                continue_fetching = False
            else:
                page += 1

            time.sleep(10)
        else:
            print("No data fetched or an error occurred.")
            continue_fetching = False

if __name__ == "__main__":
    main()


+-----+-------------------+-------------------+--------------------+-------------+--------------------+--------------------+--------------------+-------------+----------------+-------------------+--------------------+-----+------------+--------------------+
|   id|               date|      deadline_date|               title|     category|         description|               phase|               place|awarded_value|awarded_currency|       awarded_date|      suppliers_name|count|offers_count|        request_json|
+-----+-------------------+-------------------+--------------------+-------------+--------------------+--------------------+--------------------+-------------+----------------+-------------------+--------------------+-----+------------+--------------------+
|36077|2021-08-01 00:00:00|2021-08-12 00:00:00|"Régészeti feltár...|constructions|Vállalkozási kere...|E60 - Szerződéskö...|     HU MAGYARORSZÁG|       2.99E8|             HUF|2021-11-17 00:00:00|Salisbury Régésze...|    1|   

                                                                                

KeyboardInterrupt: 