In [1]:
# Code to batch load data into big query as a pipeline from the activity-data-p1 folder. In this folder, the new batches of collected data can be added and it will automatically be added
# to the pipeline.

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
# Add extra types to make sure loading data to GBQ goes as expected.
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, TimestampType, IntegerType, FloatType
from time import sleep
import pyspark.sql.functions as F
import datetime as dt
#import spark.implicits._
#import org.apache.spark.sql.functions._


sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("SparkStreaming pipeline 1")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# We need to set the following configuration whenever we need to use GCS.
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "de_jads_temp_snellejassie"
spark.conf.set('temporaryGcsBucket', bucket)

dataSchema = StructType(
        [StructField("a", IntegerType(), True),
         StructField("b", IntegerType(), True),
         StructField("year", IntegerType(), True),
         StructField("month", IntegerType(), True),
         StructField("day", IntegerType(), True),
         StructField("hour", IntegerType(), True),
         StructField("generation_solar", IntegerType(), True),
         StructField("generation_wind_offshore", IntegerType(), True),
         StructField("generation_wind_onshore", IntegerType(), True),
         StructField("forecast_solar_day_ahead", IntegerType(), True),
         StructField("forecast_wind_onshore_day_ahead", IntegerType(), True),
         StructField("price_day_ahead", FloatType(), True),
         StructField("price_actual", FloatType(), True)
         ])

# Read data that is added to the activity-data-p1 folder for this first pipeline, where the data in this folder consists out of multiple files. And data can periodically be added to this
# folder in order to keep batch uploading it.
sdf = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1) \
        .csv("/home/jovyan/data/activity-data/activity-data-p1")
# Print the structure of the data to see whether all types are correctly assigned.
print(sdf)

activityCounts = sdf.groupBy("a", "b", F.col('year'), F.col('month'), F.col('day'), F.col('hour'),
                            F.col('price_actual'), F.col('price_day_ahead'), F.col('generation_wind_offshore'), F.col('generation_wind_onshore'),
                            F.col('forecast_solar_day_ahead'), F.col('forecast_wind_onshore_day_ahead'), F.col("generation_solar")).count()


def my_foreach_batch_function(df, batch_id):
   # Saving the data to BigQuery as batch processing sink -see, use write(), save(), etc.
    df.write.format('bigquery') \
      .option('table', 'de2022-assignment2-fresh.pipelines.tablepipeline1') \
      .mode("overwrite") \
      .save()

# Write to a sink - here, the output is written to a Big Query Table
# Use your gcp bucket name. 
# ProcessingTime trigger with two-seconds micro-batch interval
activityQuery = activityCounts.writeStream.outputMode("complete") \
                    .trigger(processingTime = '2 seconds').foreachBatch(my_foreach_batch_function).start()
try:
    activityQuery.awaitTermination()
except KeyboardInterrupt:
    activityQuery.stop()
    # Stop the spark context
    spark.stop()
    print("Stopped the streaming query and the spark context")

DataFrame[a: int, b: int, year: int, month: int, day: int, hour: int, generation_solar: int, generation_wind_offshore: int, generation_wind_onshore: int, forecast_solar_day_ahead: int, forecast_wind_onshore_day_ahead: int, price_day_ahead: float, price_actual: float]


In [65]:
# Stop the spark context
spark.stop()