# Data Pipeline - Where the Sun Shines in Europe?

### Import Packages

In [None]:
import requests
import zipfile
import io
import pandas as pd
import os
from google.cloud import storage

In [2]:
zip_url = "https://knmi-ecad-assets-prd.s3.amazonaws.com/download/ECA_blend_ss.zip"

# Local folder path to save the downloaded zip file
local_folder_path = "/Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data"

# Ensure the local folder exists
if not os.path.exists(local_folder_path):
    os.makedirs(local_folder_path)

# Filename for the downloaded zip file
filename = os.path.join(local_folder_path, "ECA_blend_ss.zip")

# Download the zip file
response = requests.get(zip_url)

# Check if the request was successful (status code 200)
if response.status_code == 200:
    # Save the zip file to the local folder
    with open(filename, "wb") as file:
        file.write(response.content)
    print("Zip file downloaded successfully and saved to:", filename)
else:
    print("Failed to download the zip file.")

Zip file downloaded successfully and saved to: /Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss.zip


In [3]:
zip_file_path = "/Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss.zip"

# Directory where you want to extract the files
extract_to_dir = "/Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/raw/"

# Ensure the local folder exists
if not os.path.exists(extract_to_dir):
    os.makedirs(extract_to_dir)

# Check if the zip file exists
if os.path.exists(zip_file_path):
    # Open the zip file
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        # Extract all contents to the specified directory
        zip_ref.extractall(extract_to_dir)
        print("Extraction completed successfully.")
else:
    print("The specified zip file does not exist.")


Extraction completed successfully.


In [5]:
# Set up Google Cloud Storage client
storage_client = storage.Client()

# Path to the folder containing the text files
folder_path = "/Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/raw/"

output_path = "/Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/processed/"

# Ensure the local output folder exists
if not os.path.exists(output_path):
    os.makedirs(output_path)


# Name of the Google Cloud Storage bucket
bucket_name = "sunshine-eu-bucket"

# List all files in the folder
files = os.listdir(folder_path)

count = 0
# Iterate over each file
for file_name in files:
    # Check if the file is a txt file and starts with "SS_"
    if file_name.endswith('.txt') and file_name.startswith('SS_'):
        # Path to the input file
        input_file_path = os.path.join(folder_path, file_name)
        # Path to the output file
        output_file_path = os.path.join(output_path, f"processed_{file_name}")

        # Open input file for reading
        with open(input_file_path, 'r') as input_file:
            # Skip the first 20 lines
            lines = input_file.readlines()[20:]

        # Write the remaining content to the output file
        with open(output_file_path, 'w') as output_file:
            output_file.writelines(lines)

        # Save local copy of the CSV file
        local_output_file_path = os.path.join(output_path, f"processed_{file_name[:-4]}.csv")
        os.rename(output_file_path, local_output_file_path)
        print(f"Local copy of file 'processed_{file_name[:-4]}.csv' saved at: {local_output_file_path}")

        # Uploading files to Google Cloud Storage
        # bucket = storage_client.bucket(bucket_name)
        # blob = bucket.blob(f"processed_{file_name[:-4]}.csv")  # Change file extension to CSV
        # blob.upload_from_filename(output_file_path)
        # print(f"File 'processed_{file_name[:-4]}.csv' uploaded to Google Cloud Storage.")

        # If you want to limit the file quantity to save Google Storage Space uncomment the following lines:
        # if count <10:
        #     count = count+1
        #     print(count)
        # else:
        #     break

Local copy of file 'processed_SS_STAID004876.csv' saved at: /Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/processed/processed_SS_STAID004876.csv
Local copy of file 'processed_SS_STAID004692.csv' saved at: /Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/processed/processed_SS_STAID004692.csv
Local copy of file 'processed_SS_STAID024542.csv' saved at: /Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/processed/processed_SS_STAID024542.csv
Local copy of file 'processed_SS_STAID001715.csv' saved at: /Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/processed/processed_SS_STAID001715.csv
Local copy of file 'processed_SS_STAID024581.csv' saved at: /Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/processed/processed_SS_STAID024581.csv
Local copy of file 'proce

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType

### PySpark for Data transformation and processing

In [7]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/14 16:09:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [8]:
from pyspark.sql.functions import regexp_replace, col

# Define a function to clean numeric fields
def clean_numeric_fields(col):
    # Use regular expression to remove non-numeric characters
    return regexp_replace(col, "[^0-9-.]", "")

# Rename columns and remove spaces from numeric fields
def df_spark_clean_spaces(df):
    # Apply the cleaning function to each column
    for col_name in df.columns:
        df = df.withColumn(col_name, clean_numeric_fields(df[col_name]))

    new_columns = ['STAID', 'SOUID', 'DATE', 'SS', 'Q_SS']
    df = df.toDF(*new_columns)
    return df

In [9]:
from pyspark.sql.functions import to_date

# Transform column types
def df_spark_transform(df):
    # Transform string column to DoubleType
    df = df.withColumn("SS", df["SS"].cast("double"))
    
    # Transform string column to LongType
    df = df.withColumn("STAID", df["STAID"].cast("long"))
    df = df.withColumn("SOUID", df["SOUID"].cast("long"))
    df = df.withColumn("Q_SS", df["Q_SS"].cast("long"))
    
    # Transform string column to TimestampType
    df = df.withColumn("DATE", to_date("DATE", "yyyyMMdd"))
    
    return df

In [10]:
import os

os.environ["PYSPARK_SUBMIT_ARGS"] = "--driver-memory 2g"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--executor-memory 8g"

In [11]:
folder_path = "/Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/processed"
output_path = "/Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/spark/"

# Ensure the local folder exists
if not os.path.exists(output_path):
    os.makedirs(output_path)

# Read all cvs files in folder
df = spark.read.csv(folder_path+"/*", header=True, inferSchema=True)

# Rename columns and remove spaces from numeric fields
df = df_spark_clean_spaces(df)

# Transform column types
df = df_spark_transform(df)
    
df \
    .repartition(24) \
    .write \
    .parquet(output_path, mode="overwrite")
df.show()

24/04/14 16:13:18 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/04/14 16:13:26 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/04/14 16:13:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/04/14 16:13:33 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

+-----+------+----------+-------+----+
|STAID| SOUID|      DATE|     SS|Q_SS|
+-----+------+----------+-------+----+
|   16|126088|1775-04-01|-9999.0|   9|
|   16|126088|1775-04-02|-9999.0|   9|
|   16|126088|1775-04-03|-9999.0|   9|
|   16|126088|1775-04-04|-9999.0|   9|
|   16|126088|1775-04-05|-9999.0|   9|
|   16|126088|1775-04-06|-9999.0|   9|
|   16|126088|1775-04-07|-9999.0|   9|
|   16|126088|1775-04-08|-9999.0|   9|
|   16|126088|1775-04-09|-9999.0|   9|
|   16|126088|1775-04-10|-9999.0|   9|
|   16|126088|1775-04-11|-9999.0|   9|
|   16|126088|1775-04-12|-9999.0|   9|
|   16|126088|1775-04-13|-9999.0|   9|
|   16|126088|1775-04-14|-9999.0|   9|
|   16|126088|1775-04-15|-9999.0|   9|
|   16|126088|1775-04-16|-9999.0|   9|
|   16|126088|1775-04-17|-9999.0|   9|
|   16|126088|1775-04-18|-9999.0|   9|
|   16|126088|1775-04-19|-9999.0|   9|
|   16|126088|1775-04-20|-9999.0|   9|
+-----+------+----------+-------+----+
only showing top 20 rows



In [12]:
df.show()

+-----+------+----------+-------+----+
|STAID| SOUID|      DATE|     SS|Q_SS|
+-----+------+----------+-------+----+
|   16|126088|1775-04-01|-9999.0|   9|
|   16|126088|1775-04-02|-9999.0|   9|
|   16|126088|1775-04-03|-9999.0|   9|
|   16|126088|1775-04-04|-9999.0|   9|
|   16|126088|1775-04-05|-9999.0|   9|
|   16|126088|1775-04-06|-9999.0|   9|
|   16|126088|1775-04-07|-9999.0|   9|
|   16|126088|1775-04-08|-9999.0|   9|
|   16|126088|1775-04-09|-9999.0|   9|
|   16|126088|1775-04-10|-9999.0|   9|
|   16|126088|1775-04-11|-9999.0|   9|
|   16|126088|1775-04-12|-9999.0|   9|
|   16|126088|1775-04-13|-9999.0|   9|
|   16|126088|1775-04-14|-9999.0|   9|
|   16|126088|1775-04-15|-9999.0|   9|
|   16|126088|1775-04-16|-9999.0|   9|
|   16|126088|1775-04-17|-9999.0|   9|
|   16|126088|1775-04-18|-9999.0|   9|
|   16|126088|1775-04-19|-9999.0|   9|
|   16|126088|1775-04-20|-9999.0|   9|
+-----+------+----------+-------+----+
only showing top 20 rows



In [13]:
print((df.count(), len(df.columns)))



(47028860, 5)


                                                                                

In [14]:
df_spark = spark.read.parquet(output_path)
df_spark.printSchema()

root
 |-- STAID: long (nullable = true)
 |-- SOUID: long (nullable = true)
 |-- DATE: date (nullable = true)
 |-- SS: double (nullable = true)
 |-- Q_SS: long (nullable = true)



In [15]:
df_spark

DataFrame[STAID: bigint, SOUID: bigint, DATE: date, SS: double, Q_SS: bigint]

In [16]:
print((df_spark.count(), len(df_spark.columns)))

[Stage 13:>                                                         (0 + 8) / 8]

(47028860, 5)


                                                                                

### Spark Source file

In [18]:
source_file = "/Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/sources/processed_sources.csv"

In [19]:
df_source = spark.read \
    .option("header", "true") \
    .csv(source_file)

In [20]:
df_source.show()

+-----+------+----------------------------------------+---+---------+----------+----+----+--------+--------+-----+--------------------+
|STAID| SOUID|SOUNAME                                 | CN|      LAT|       LON|HGHT|ELEI|   START|    STOP|PARID|             PARNAME|
+-----+------+----------------------------------------+---+---------+----------+----+----+--------+--------+-----+--------------------+
|   11|132864|                    KREMSMUENSTER (TA...| AT|+48:03:18|+014:07:51| 382| SS1|19880101|20211231|    3|Wolfgang Lipa    ...|
|   11|225516|                    KREMSMUENSTER    ...| AT|+48:03:19|+014:07:54| 383| SS3|19740101|20071031|    3|Wolfgang Lipa    ...|
|   11|236488|                    KREMSMUENSTER (TA...| AT|+48:03:18|+014:07:50| 382| SS3|18740101|20211231|    3|Wolfgang Lipa    ...|
|   11|911012|                    KREMSMUENSTER    ...| AT|+48:03:00|+014:07:59| 389| SS4|20150501|20230613|    -|Synoptical messag...|
|   12|132862|                    GRAZ-UNIVERSIT

In [21]:
df_source.schema

StructType([StructField('STAID', StringType(), True), StructField(' SOUID', StringType(), True), StructField('SOUNAME                                 ', StringType(), True), StructField('CN', StringType(), True), StructField('      LAT', StringType(), True), StructField('       LON', StringType(), True), StructField('HGHT', StringType(), True), StructField('ELEI', StringType(), True), StructField('   START', StringType(), True), StructField('    STOP', StringType(), True), StructField('PARID', StringType(), True), StructField('PARNAME', StringType(), True)])

In [22]:
new_columns = ['STAID','SOUID','SOUNAME','CN','LAT','LON','HGHT','ELEI','START', 'STOP', 'PARID', 'PARNAME']
df_source = df_source.toDF(*new_columns)

In [23]:
# Transform string column to LongType
df_source = df_source.withColumn("STAID", df_source["STAID"].cast("long"))
df_source = df_source.withColumn("SOUID", df_source["SOUID"].cast("long"))
df_source = df_source.withColumn("HGHT", df_source["HGHT"].cast("long"))
    
# Transform string column to TimestampType
df_source = df_source.withColumn("START", to_date("START", "yyyyMMdd"))
df_source = df_source.withColumn("STOP", to_date("STOP", "yyyyMMdd"))

In [24]:
df_source.schema

StructType([StructField('STAID', LongType(), True), StructField('SOUID', LongType(), True), StructField('SOUNAME', StringType(), True), StructField('CN', StringType(), True), StructField('LAT', StringType(), True), StructField('LON', StringType(), True), StructField('HGHT', LongType(), True), StructField('ELEI', StringType(), True), StructField('START', DateType(), True), StructField('STOP', DateType(), True), StructField('PARID', StringType(), True), StructField('PARNAME', StringType(), True)])

In [25]:
df_source.show()

+-----+------+--------------------+---+---------+----------+----+----+----------+----------+-----+--------------------+
|STAID| SOUID|             SOUNAME| CN|      LAT|       LON|HGHT|ELEI|     START|      STOP|PARID|             PARNAME|
+-----+------+--------------------+---+---------+----------+----+----+----------+----------+-----+--------------------+
|   11|132864|KREMSMUENSTER (TA...| AT|+48:03:18|+014:07:51| 382| SS1|1988-01-01|2021-12-31|    3|Wolfgang Lipa    ...|
|   11|225516|KREMSMUENSTER    ...| AT|+48:03:19|+014:07:54| 383| SS3|1974-01-01|2007-10-31|    3|Wolfgang Lipa    ...|
|   11|236488|KREMSMUENSTER (TA...| AT|+48:03:18|+014:07:50| 382| SS3|1874-01-01|2021-12-31|    3|Wolfgang Lipa    ...|
|   11|911012|KREMSMUENSTER    ...| AT|+48:03:00|+014:07:59| 389| SS4|2015-05-01|2023-06-13|    -|Synoptical messag...|
|   12|132862|GRAZ-UNIVERSITAET...| AT|+47:04:40|+015:26:56| 367| SS1|1988-06-01|2021-12-31|    3|Wolfgang Lipa    ...|
|   12|229065|GRAZ-UNIVERSITAET...| AT|+

### Join country table

In [26]:
# Read Parquet files with specified schema
df_spark = spark.read.parquet(output_path)


24/04/14 16:14:39 ERROR Executor: Exception in task 0.0 in stage 19.0 (TID 3928)
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:387)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:443)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:493)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:485)
	at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:80)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.sp

Py4JJavaError: An error occurred while calling o115.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 1 times, most recent failure: Lost task 0.0 in stage 19.0 (TID 3928) (marcelas-air executor driver): org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:387)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:443)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:493)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:485)
	at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:80)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: [CANNOT_READ_FILE_FOOTER] Could not read footer for file: file:/Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/sources/processed_sources.csv. Please ensure that the file is in either ORC or Parquet format. If not, please convert it to a valid format. If the file is in the valid format, please check if it is corrupt. If it is, you can choose to either ignore it or fix the corruption.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFooterForFileError(QueryExecutionErrors.scala:1056)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:456)
	at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:384)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: java.lang.RuntimeException: file:/Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/sources/processed_sources.csv is not a Parquet file. Expected magic number at tail, but found [32, 32, 32, 10]
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:565)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:799)
	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:85)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:76)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:450)
	... 14 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:74)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:497)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:132)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:79)
	at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:208)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:205)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:407)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:563)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:387)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:443)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:493)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:485)
	at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:80)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: org.apache.spark.SparkException: [CANNOT_READ_FILE_FOOTER] Could not read footer for file: file:/Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/sources/processed_sources.csv. Please ensure that the file is in either ORC or Parquet format. If not, please convert it to a valid format. If the file is in the valid format, please check if it is corrupt. If it is, you can choose to either ignore it or fix the corruption.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFooterForFileError(QueryExecutionErrors.scala:1056)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:456)
	at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:384)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: java.lang.RuntimeException: file:/Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/sources/processed_sources.csv is not a Parquet file. Expected magic number at tail, but found [32, 32, 32, 10]
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:565)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:799)
	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:85)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:76)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:450)
	... 14 more


In [27]:
print((df_spark.count(), len(df_spark.columns)))

(47028860, 5)


In [28]:
df_spark.show()

+-----+------+----------+-------+----+
|STAID| SOUID|      DATE|     SS|Q_SS|
+-----+------+----------+-------+----+
| 4332| 25072|1965-01-07|    0.0|   0|
| 1686|  5192|2006-10-20|    7.0|   0|
|   16|126088|1793-03-20|-9999.0|   9|
|24406|130872|1880-04-02|   37.0|   0|
|24756|135422|1959-03-04|   18.0|   0|
|24405|130872|1972-10-22|   55.0|   0|
|24035|130898|1984-08-16|-9999.0|   9|
| 4332| 81081|1925-10-11|    0.0|   0|
|24405|126088|1820-04-02|-9999.0|   9|
|   45| 81081|1911-07-22|  128.0|   0|
|24034|130911|1987-10-08|   61.0|   0|
|24407|130898|1972-07-31|    0.0|   0|
|24758|135448|2005-02-13|   56.0|   0|
|  274|  5616|1931-02-02|   49.0|   0|
|24661|130885|1870-11-10|    0.0|   0|
|   48| 11539|1871-03-27|-9999.0|   9|
|24393|  2165|1989-10-24|   81.0|   0|
|   49| 15754|1861-05-11|-9999.0|   9|
|24661|134551|1956-03-25|    0.0|   0|
|24035|130885|1869-02-15|    0.0|   0|
+-----+------+----------+-------+----+
only showing top 20 rows



In [29]:
df_spark.createOrReplaceTempView('df_metrics')

In [30]:
df_spark = spark.sql("""
SELECT 
    STAID as Station_id,
    DATE,
    sum(SS) as Sun_hrs,
    Q_SS as Data_quality
FROM
    df_metrics
WHERE
    Q_SS = 0
GROUP BY
    1, 2,4
""")

In [31]:
from pyspark.sql import types

schema = types.StructType([
    types.StructField('STAID', types.LongType(), True),
    types.StructField('SOUID', types.LongType(), True),
    types.StructField('DATE', types.TimestampType(), True),
    types.StructField('SS', types.DoubleType(), True),
    types.StructField('Q_SS', types.LongType(), True)
])

In [32]:
output_path_metrics = "/Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/spark_tables/metrics"

# Ensure the local folder exists
if not os.path.exists(output_path_metrics):
    os.makedirs(output_path_metrics)

df_spark \
    .repartition(24) \
    .write.parquet(output_path_metrics, mode='overwrite')

24/04/14 16:15:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/14 16:15:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/14 16:15:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/14 16:15:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/14 16:15:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/14 16:15:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/14 16:15:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/14 16:15:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/14 16:15:53 WARN RowBasedKeyValueBatch: Calling spill() on

In [33]:
df_source.createOrReplaceTempView('df_dimensions')

In [34]:
df_source.show()

+-----+------+--------------------+---+---------+----------+----+----+----------+----------+-----+--------------------+
|STAID| SOUID|             SOUNAME| CN|      LAT|       LON|HGHT|ELEI|     START|      STOP|PARID|             PARNAME|
+-----+------+--------------------+---+---------+----------+----+----+----------+----------+-----+--------------------+
|   11|132864|KREMSMUENSTER (TA...| AT|+48:03:18|+014:07:51| 382| SS1|1988-01-01|2021-12-31|    3|Wolfgang Lipa    ...|
|   11|225516|KREMSMUENSTER    ...| AT|+48:03:19|+014:07:54| 383| SS3|1974-01-01|2007-10-31|    3|Wolfgang Lipa    ...|
|   11|236488|KREMSMUENSTER (TA...| AT|+48:03:18|+014:07:50| 382| SS3|1874-01-01|2021-12-31|    3|Wolfgang Lipa    ...|
|   11|911012|KREMSMUENSTER    ...| AT|+48:03:00|+014:07:59| 389| SS4|2015-05-01|2023-06-13|    -|Synoptical messag...|
|   12|132862|GRAZ-UNIVERSITAET...| AT|+47:04:40|+015:26:56| 367| SS1|1988-06-01|2021-12-31|    3|Wolfgang Lipa    ...|
|   12|229065|GRAZ-UNIVERSITAET...| AT|+

In [35]:
df_source = spark.sql("""
SELECT 
    STAID as Station_id,
    CN as Country_code
FROM
    df_dimensions
GROUP BY
    1, 2
""")

In [36]:
output_path_source = "/Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/spark_tables/dimensions"

# Ensure the local folder exists
if not os.path.exists(output_path_source):
    os.makedirs(output_path_source)

df_source \
    .repartition(20) \
    .write.parquet(output_path_source, mode='overwrite')

24/04/14 16:17:48 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/04/14 16:17:49 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [37]:
df_metrics = spark.read.parquet(output_path_metrics)
df_dimensions = spark.read.parquet(output_path_source)

In [38]:
df_dimensions.show()

+----------+------------+
|Station_id|Country_code|
+----------+------------+
|      4928|          DE|
|     24637|          AT|
|     24741|          AT|
|      4112|          DE|
|      3968|          ES|
|     11240|          SE|
|     22768|          DK|
|     24173|          AT|
|      4346|          DE|
|     24212|          AT|
|     24016|          AT|
|       329|          NO|
|     24094|          AT|
|        41|          DE|
|      4238|          DE|
|     25127|          PL|
|      4853|          DE|
|     25138|          PL|
|      2620|          NO|
|     24817|          AT|
+----------+------------+
only showing top 20 rows



In [39]:
df_join = df_metrics.join(df_dimensions, on=['Station_id'], how='left')

In [42]:
df_join.show()

+----------+----------+-------+------------+------------+
|Station_id|      DATE|Sun_hrs|Data_quality|Country_code|
+----------+----------+-------+------------+------------+
|     23973|1972-08-29|   34.0|           0|          AT|
|       986|1954-11-11|   32.0|           0|          LV|
|     11775|1977-12-24|    2.0|           0|          DE|
|      4138|2003-06-16|  124.0|           0|          DE|
|      4138|2003-06-16|  124.0|           0|          NL|
|      4021|1950-11-22|   48.0|           0|          DE|
|      4685|1991-07-15|  123.0|           0|          DE|
|       333|2003-08-04|  116.0|           0|          PL|
|     23946|1999-12-01|    0.0|           0|          AT|
|     11207|1984-01-12|   80.0|           0|          ES|
|     24674|2012-05-02|   98.0|           0|          AT|
|      2943|1976-11-22|    0.0|           0|          IS|
|      4908|2013-11-09|   31.0|           0|          DE|
|       489|2016-05-11|   63.0|           0|          DE|
|      4054|19

In [45]:
output_path_source = "/Users/marcelaulloa/Data_Zoomcamp/de-project-zoomcamp/where_the_sun_shines_EU/data/ECA_blend_ss/spark_tables/report"

# Ensure the local folder exists
if not os.path.exists(output_path_source):
    os.makedirs(output_path_source)


df_join.write.parquet(output_path_source, mode='overwrite')

24/04/14 16:20:23 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [46]:
df_join = spark.read.parquet(output_path_source)

In [47]:
df_join

DataFrame[Station_id: bigint, DATE: date, Sun_hrs: double, Data_quality: bigint, Country_code: string]

In [48]:
df_join.show()

+----------+----------+-------+------------+--------------------+
|Station_id|      DATE|Sun_hrs|Data_quality|        Country_code|
+----------+----------+-------+------------+--------------------+
|       230|1992-08-12|  117.0|           0|                  ES|
|      4451|1987-09-30|  103.0|           0|                  DE|
|     23977|2010-09-18|   54.0|           0|                  AT|
|     10901|1991-07-12|  126.0|           0|                  HR|
|      4298|1942-08-16|   21.0|           0|                  DE|
|      3917|1990-09-18|   97.0|           0|                  ES|
|       262|1952-06-05|  121.0|           0|                  RS|
|     24342|1978-04-05|    8.0|           0|                  AT|
|     24418|1942-01-21|   48.0|           0|                  AT|
|     24138|2022-11-14|    0.0|           0|                  CH|
|     24138|2022-11-14|    0.0|           0|                  AT|
|      4002|1999-08-27|   16.0|           0|                  DE|
|     2469

In [49]:
# Stop the SparkSession
spark.stop()