In [1]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# initialize spark session
spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("StagingToWarehouse") \
        .config("spark.executor.memory", "2g") \
        .config("spark.executor.cores", '2') \
        .config("spark.driver.memory", '2g') \
        .config("spark.driver.cores", '2') \
        .config("spark.cassandra.connection.host", "localhost") \
        .config("spark.cassandra.connection.port", "9042") \
        .config("spark.cassandra.auth.username", "root") \
        .config("spark.cassandra.auth.password", "admin") \
        .config("spark.jars", "../../tools/spark-jars/spark-3.3-bigquery-0.31.1.jar") \
        .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
        .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "../../tools/bigquery/key_file.json") \
        .getOrCreate()
# define the project_id to be used
project_id='test-373705'

In [8]:
# get max id of fact_records table
fact_records_max_id = spark.read \
                        .format("bigquery") \
                        .options(parentProject=project_id) \
                        .options(table='flight_delay.fact_records') \
                        .load().select('id') \
                        .agg(max("id")) \
                        .collect()[0][0]
# ensure max id have a value
if fact_records_max_id is None:
    fact_records_max_id=-1

# extract data from mysql table
mysql = spark.read \
            .format("org.apache.spark.sql.cassandra") \
            .options(keyspace="flight_delay", table="mysql") \
            .load() \
            .filter(col('id')>fact_records_max_id)
# extract data from mongodb table
mongodb = spark.read \
            .format("org.apache.spark.sql.cassandra") \
            .options(keyspace="flight_delay", table="mongodb") \
            .load() \
            .filter(col('id')>fact_records_max_id)
# integrate data
source = mysql.union(mongodb)

In [9]:
source.printSchema()

root
 |-- id: long (nullable = false)
 |-- actualelapsedtime: double (nullable = true)
 |-- airtime: double (nullable = true)
 |-- arrdel15: double (nullable = true)
 |-- arrdelay: double (nullable = true)
 |-- arrdelayminutes: double (nullable = true)
 |-- arrivaldelaygroups: double (nullable = true)
 |-- arrtime: double (nullable = true)
 |-- arrtimeblk: string (nullable = true)
 |-- cancellationcode: string (nullable = true)
 |-- cancelled: double (nullable = true)
 |-- carrierdelay: double (nullable = true)
 |-- crsarrtime: long (nullable = true)
 |-- crsdeptime: long (nullable = true)
 |-- crselapsedtime: double (nullable = true)
 |-- dayofmonth: long (nullable = true)
 |-- dayofweek: long (nullable = true)
 |-- departuredelaygroups: double (nullable = true)
 |-- depdel15: double (nullable = true)
 |-- depdelay: double (nullable = true)
 |-- depdelayminutes: double (nullable = true)
 |-- deptime: double (nullable = true)
 |-- deptimeblk: string (nullable = true)
 |-- dest: string 

In [10]:
# get id, flight_date from dim_date table
current_dim_date = spark.read \
                    .format("bigquery") \
                    .options(parentProject=project_id) \
                    .options(table='flight_delay.dim_date') \
                    .load().select('id', 'flight_date')
# get max id of dim_date table
max_dim_date_id = current_dim_date.agg(max("id")).collect()[0][0]
# ensure max id have a value
if fact_records_max_id is None:
    fact_records_max_id=-1
# get new data from source
source_dim_date = source.selectExpr("year", "quarter", "month", "dayofmonth as day_of_month", "dayofweek as day_of_week", "flightdate as flight_date") \
                    .distinct()
# mapping
joined_dim_date = source_dim_date.join(current_dim_date, on="flight_date", how="left")
# fullfill id
dim_date = joined_dim_date.filter(col("id").isNull()) \
                    .orderBy(col("flight_date").asc()) \
                    .withColumn("id", monotonically_increasing_id())






In [64]:
spark.stop()