In [121]:
#buat pandasrawclient yang bentuknya dah langsung table data 
import os 
from dotenv import load_dotenv
import requests
from google.oauth2 import service_account
import xmltodict
import json
import pandas as pd 
from requests import request
import pytz
from bs4 import BeautifulSoup
from typing import Dict
from google.oauth2 import service_account
from google.cloud import storage

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os
import sys
import re

import csv

from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, BooleanType
from pyspark.sql.types import DoubleType, IntegerType, StringType, DataType
from pyspark.sql import functions as F

from entsoe import EntsoeRawClient
from entsoe import EntsoePandasClient





In [122]:
#load env variables
load_dotenv('./creds/.env', verbose=True, override=True)

True

In [4]:
print(os.environ.get("PYSPARK_DRIVER_PYTHON"))

None


In [123]:


'''
----------------
INIT VARIABLES
----------------
'''

#setting up entsoe variables
security_token = os.environ.get("SECURITY_TOKEN")
ENTSOE_URL = 'https://transparency.entsoe.eu/api'

#setting up GCP variables
service_account_file = os.environ.get("SERVICE_ACCOUNT_FILE")
credentials = service_account.Credentials.from_service_account_file(
    service_account_file
)
gcs_bucket = os.environ.get("CLOUD_STORAGE_BUCKET")
print(gcs_bucket)

entsoe_analytics_csv_1009


In [146]:

#setting up session
entsoe_client = EntsoePandasClient(security_token)
'''
----------------
SETTING UP FUNCTION CALLS 
----------------
'''

# upload data to GCS
def upload_blob_to_gcs(bucket_name, contents, destination_blob_name):
    # Upload file to bucket"""

    # ID of GCS bucket
    # bucket_name =

    # the contents from memory to be uploaded to file
    # contents =

    # the ID of your GCS object
    # destination_blob_name =

    storage_client = storage.Client(credentials=credentials)
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_string(contents)


'''
----------------
EXTRACTION
----------------
'''
#for test, we'll be querying

start=pd.Timestamp('202101010000', tz='Europe/Berlin')
end=pd.Timestamp('202101010600', tz='Europe/Berlin')
country_code= 'DE_TENNET'
country_code_from=''
country_code_to=''
type_marketagreement_type=''
contract_marketagreement_type=''
label_data='total_generation'

try:
    entsoe_data = entsoe_client.query_generation(country_code, start=start, end=end)
    #for those method which header is in first AND second row (place it in if block later)
    #get header correctly if it's for query generation
    entsoe_header_list = [f"{x} - {y}" for x,y in entsoe_data.columns]
    entsoe_data = entsoe_data[1:]
    #if header is already correct, header = True. if not, header = entsoe_header_list
    entsoe_csv = entsoe_data.to_csv(header=entsoe_header_list)
    
except Exception as e:
    print("An exception occurred:", e)


'''
----------------
LOAD
----------------
'''
#upload to GCS
landing_filename=f"{label_data}__{country_code}__{start}__{end}.csv"
upload_blob_to_gcs(bucket_name=gcs_bucket, contents=entsoe_csv, destination_blob_name=landing_filename)

In [127]:
## COBA SPARK GCS CONNECTOR ##


# berangkat pak haji
# setup parameternya
gcs_bucket = gcs_bucket
path = f"gs://{gcs_bucket}/{landing_filename}"
# path ="/home/rafzul/projects/entsoe-pipelines/sample.xml"

#coba spark gcs connector
#setup sparksession for entry point - COBA GCS CONNECTOR
SPARK_HOME = os.environ["SPARK_HOME"]
spark = SparkSession.builder.appName("gcp_playground") \
    .config("spark.jars", f"{SPARK_HOME}/jars/gcs-connector-hadoop3-latest.jar, {SPARK_HOME}/jars/spark-bigquery-with-dependencies_2.13-0.27.1.jar") \
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", service_account_file) \
    .getOrCreate()



In [150]:
#setiap jam 12 malam, ambil schema dari gcs
# download data from GCS
def download_blob_from_gcs(bucket_name, source_blob_name, local_source_blob_name):
    # Upload file to bucket"""

    # ID of GCS bucket
    # bucket_name =

    # the contents from memory to be uploaded to file
    # contents =

    # the ID of your GCS object
    # destination_blob_name =

    storage_client = storage.Client(credentials=credentials)
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(source_blob_name)

    blob.download_to_filename(local_source_blob_name)
    
    
source_schema_filename = f"schema_source/{label_data}__schema.json"
local_source_schema_filename = f"/home/rafzul/projects/entsoe-pipelines/schemas/source/{label_data}__schema.json"
download_blob_from_gcs(bucket_name=gcs_bucket, source_blob_name=source_schema_filename, local_source_blob_name=local_source_schema_filename)

In [158]:
#setting up schema - block

with open(local_source_schema_filename, "r") as local_source:
    source_schema_data = json.loads(local_source.read())
    source_schema_version = json_schema_data["version"]
    json_enforced_source_schema = StructType.fromJson(source_schema_data["schema"])
    

try: 
   df_spark = spark.read.format("csv").schema(json_enforced_source_schema) \
      .option("inferSchema","false") \
      .option("header","true") \
      .load(path)
except Exception as e:
   df_spark = spark.read.format("csv") \
      .option("inferSchema","true") \
      .option("header","true") \
      .load(path)
   schema_version = source_schema_version + 0.1
   new_schema = { "version": float(schema_version),"schema": df_spark.schema.jsonValue()}
   new_schema_json = json.dumps(new_schema)
   new_source_schema_filename_new = f"schema_source/{label_data}__schema__{schema_version}.json"
   upload_blob_to_gcs(bucket_name=gcs_bucket, contents=new_schema_json, destination_blob_name=new_source_schema_filenam)


   # create code to increment schema version
#inferSchema true kalo ngga ada kesalahan. except exception: inferSchema false dan .schema nya di set manual

In [159]:
df_spark.dtypes

[('_c0', 'timestamp'),
 ('Biomass - Actual Aggregated', 'double'),
 ('Fossil Brown coal/Lignite - Actual Aggregated', 'double'),
 ('Fossil Gas - Actual Aggregated', 'double'),
 ('Fossil Hard coal - Actual Aggregated', 'double'),
 ('Fossil Oil - Actual Aggregated', 'double'),
 ('Geothermal - Actual Aggregated', 'double'),
 ('Hydro Pumped Storage - Actual Aggregated', 'double'),
 ('Hydro Pumped Storage - Actual Consumption', 'double'),
 ('Hydro Run-of-river and poundage - Actual Aggregated', 'double'),
 ('Hydro Water Reservoir - Actual Aggregated', 'double'),
 ('Nuclear - Actual Aggregated', 'double'),
 ('Other - Actual Aggregated', 'double'),
 ('Other renewable - Actual Aggregated', 'double'),
 ('Solar - Actual Aggregated', 'double'),
 ('Waste - Actual Aggregated', 'double'),
 ('Wind Offshore - Actual Aggregated', 'double'),
 ('Wind Onshore - Actual Aggregated', 'double')]

In [143]:
# new_schema = { "version": float(schema_version),
#               "schema": df_spark.schema.jsonValue()}
# new_schema_json = json.dumps(new_schema)
# source_schema_filename = f"schema_source/{label_data}__schema__{schema_version}.json"
# upload_blob_to_gcs(bucket_name=gcs_bucket, contents=new_schema_json, destination_blob_name=source_schema_filename)



In [52]:
# #create schema to be enforced in subsequent json load operation
# with open("schema_raw_j", "w") as schrawjson:
#     schrawjson.write(df_spark_orig.schema.json())

In [17]:
# with open("schema_raw2.json", "r") as schrawjson:
#     json_schema_data = schrawjson.read()
#     json_enforced_schema = StructType.fromJson(json.loads(json_schema_data))
    

In [18]:
# #create dataframe from gcs
# path = f"gs://{gcs_bucket}/{landing_filename}"
# print(path)
# df_spark = spark.read.format("json").schema(json_enforced_schema) \
#    .option("header","true") \
#    .option("multiLine","true") \
#    .load(path) \
#    .select("GL_MarketDocument.*")
   

gs://entsoe_analytics_1009/entsoe_data_DE_TENNET.json


In [9]:
#clean non timeseries column name. change dot to underscores
df_spark = df_spark.toDF(*(c_name.replace(".", "_") for c_name in df_spark.columns))

#clean timeseries column. cast TimeSeries to new scheme where 1.dot in names are replaced with underscores and 2. Strange characters such as '@' or '#' are removed
ts_schema = df_spark.select("TimeSeries").dtypes[0][1]
replacements = [('\.', '_'), ('[@#]', '')]
for old, new in replacements:
    ts_schema = re.sub(old, new, ts_schema)
df_spark = df_spark.withColumn("TimeSeries", (F.col("TimeSeries").cast(ts_schema)))

In [10]:
#flatten column

# explode timeseries column into struct
df_spark = df_spark.withColumn("TimeSeries", F.explode("TimeSeries"))


In [11]:
df_spark.printSchema()

root
 |-- @xmlns: string (nullable = true)
 |-- mRID: string (nullable = true)
 |-- revisionNumber: string (nullable = true)
 |-- type: string (nullable = true)
 |-- process_processType: string (nullable = true)
 |-- sender_MarketParticipant_mRID: struct (nullable = true)
 |    |-- #text: string (nullable = true)
 |    |-- @codingScheme: string (nullable = true)
 |-- sender_MarketParticipant_marketRole_type: string (nullable = true)
 |-- receiver_MarketParticipant_mRID: struct (nullable = true)
 |    |-- #text: string (nullable = true)
 |    |-- @codingScheme: string (nullable = true)
 |-- receiver_MarketParticipant_marketRole_type: string (nullable = true)
 |-- createdDateTime: string (nullable = true)
 |-- time_Period_timeInterval: struct (nullable = true)
 |    |-- end: string (nullable = true)
 |    |-- start: string (nullable = true)
 |-- TimeSeries: struct (nullable = true)
 |    |-- MktPSRType: struct (nullable = true)
 |    |    |-- psrType: string (nullable = true)
 |    |-- P

In [97]:


# flatten TimeSeries if there is TimeSeries, flatten AttributeInstanceComponent if there is AttributeInstanceComponent
def unpack_df(nested_df):
    component_columns = ["TimeSeries", "AttributeInstanceComponent"]
    general_cols = [c for c in nested_df.columns if c not in component_columns]
    if "TimeSeries" in nested_df.columns:
        data_cols_name = "TimeSeries"
        data_cols = [c for c in nested_df.select("TimeSeries.*").columns]
    else:
        pass
    unpacked_df = nested_df.select(general_cols \
                                   + [F.col(data_cols_name+"."+c).alias(data_cols_name+"_"+c)\
                                      for c in data_cols])
    return unpacked_df
    

df_spark = unpack_df(df_spark)
df_spark.printSchema()

root
 |-- @xmlns: string (nullable = true)
 |-- mRID: string (nullable = true)
 |-- revisionNumber: string (nullable = true)
 |-- type: string (nullable = true)
 |-- process_processType: string (nullable = true)
 |-- sender_MarketParticipant_mRID: struct (nullable = true)
 |    |-- #text: string (nullable = true)
 |    |-- @codingScheme: string (nullable = true)
 |-- sender_MarketParticipant_marketRole_type: string (nullable = true)
 |-- receiver_MarketParticipant_mRID: struct (nullable = true)
 |    |-- #text: string (nullable = true)
 |    |-- @codingScheme: string (nullable = true)
 |-- receiver_MarketParticipant_marketRole_type: string (nullable = true)
 |-- createdDateTime: string (nullable = true)
 |-- time_Period_timeInterval: struct (nullable = true)
 |    |-- end: string (nullable = true)
 |    |-- start: string (nullable = true)
 |-- TimeSeries_MktPSRType: struct (nullable = true)
 |    |-- psrType: string (nullable = true)
 |-- TimeSeries_Period: struct (nullable = true)
 | 

In [102]:
#tulis data ke bigquery via temporary gcs bucket
df_spark.write \
  .format("bigquery") \
  .option("project","rafzul-analytics-1009") \
  .option("temporaryGcsBucket","entsoe_temp_1009") \
  .mode("append") \
  .save("rafzul-analytics-1009.entsoe_playground.fact_test")

                                                                                

In [None]:
# if df_spark2.select("TimeSeries"):
#     # rename all column (specifically replace dot with underscore) on every struct nested in TimeSeries array column. use transform
#     df_spark3 = df_spark3.withColumn("TimeSeries", F.transform \
#     ("TimeSeries", lambda el,ind: \
#     F.struct \
#     ("abc" for c_name in el.columns)))
    
# # # #get list of names of all columns in every struct inside TimeSeries array column
# # a = [x for x in df_spark2.select(("TimeSeries"))]
# # print(a)

# df_spark3.select("TimeSeries").show(10, truncate=False)
# a = F.struct(F.col("TimeSeries").getItem(c_name).alias(c_name.replace(".", "_")) for i, c_name in enumerate(df_spark2.schema["TimeSeries"].dataType.elementType.fieldNames()))
# # print(a


# F.struct(F.col("TimeSeries") for c_name in df_spark2.schema[x].dataType.names


# df_spark2.show()
# df_spark2.printSchema()

# # #get list of names of all columns in every struct inside TimeSeries array column
# a = [(c_name,i) for (c_name,i) in enumerate(df_spark2.schema["TimeSeries"].dataType.elementType.fieldNames())]
# print(a)


# df_spark2 = df_spark2.withColumn("TimeSeries", F.transform("TimeSeries", lambda x: F.struct(*[F.col("TimeSeries." + c_name.replace(".", "_")).alias(c_name) for c_name in x])))

In [16]:
# #for non timeseries column
# df_spark2 = df_spark2.toDF(*(c_name.replace(".", "_") for c_name in df_spark2.columns))
# b = df_spark2.select(F.col("TimeSeries"))
# #for timeseries one
# a = F.struct(F.col("TimeSeries").getItem(c_name).alias(c_name.replace(".", "_")) for i, c_name in enumerate(df_spark2.schema["TimeSeries"].dataType.elementType.fieldNames()))
# print(a)
# if df_spark2.select("TimeSeries"):
#     # rename all column (specifically replace dot with underscore) on every struct nested in TimeSeries array column. use transform, use Timeseries schema and its datatype.name o
#     df_spark2 = df_spark2.withColumn("TimeSeries", F.transform("TimeSeries", lambda x: (a)))
    
# # #get list of names of all columns in every struct inside TimeSeries array column
# # a = [c_name for c_name in enumerate(df_spark2.schema["TimeSeries"].dataType.elementType.fieldNames())]


# df_spark3.show()
# df_spark3.printSchema()



# # df_spark2 = df_spark2.withColumn("TimeSeries", F.transform("TimeSeries", lambda x: F.struct(*[F.col("TimeSeries." + c_name.replace(".", "_")).alias(c_name) for c_name in x])))

In [134]:
# #cast schema with the dot replaced with underscore, programatically


# ---- solution for  column names beside TimeSeries:
# changed_column_general = [(column_name, column_name.replace(".", "_")) for column_name in df_schema.fieldNames() if "." in x]
# for column_name, changed_column_name in changed_column_general:
#     df_schema = df_spark2.select([F.col(c).alias(mappingcolumn_name, changed_column_name)
# # UPDATE: there is even better solution

# #changed non timeseries column
# df_spark2 = df_spark2.toDF(*(c.replace(".", "_") for c in df_spark2.columns))

In [76]:
with open("example_destination.json", "w+") as output_file:
    output_file.write(df_spark2.toJSON())

TypeError: write() argument must be str, not RDD

In [None]:
# #inferring schema and get the data type of each column and turn it into spark dataframe
# datatype_infer = pd.DataFrame.from_dict(xml_file_dict[0], orient='index')

In [None]:
# #flatten nested df at every layer
# from pyspark.sql.types import *
# from pyspark.sql.import functions as f

# def flatten_structs(nested_df):
#     stack = [(), nested_df]
#     columns = []
    
#     while len(stack) > 0:
#         parents
        
    

In [None]:
# df_schema = df_spark.dtypes

# firstrow_orig = df_spark.collect()[0]
# new_header = [f"{x} - {firstrow_orig.__getitem__(x)}" for x in firstrow_orig.__fields__]

# df_spark = df_spark.where(F.col("_c0").isNotNull())
# df_spark.show()

# # #drop header and first column
