In [None]:
%pip install pandas
%pip install pyspark==3.5.0
%pip install -q findspark
%pip install py4j
%pip install pyarrow
%pip install numpy

In [1]:
# for local operations
import os
os.environ['JAVA_HOME'] = "C:\\Program Files\\Java\\jdk-11"
os.environ['HADOOP_HOME'] = "C:\\hadoop-3.3.6"

import pandas as pd
from pyspark.sql import functions as f

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .getOrCreate()
spark

In [3]:
from pyspark.sql import types as T

data_schema = T.StructType([
    T.StructField('contract_id', T.StringType(), False),
    T.StructField('timestamp', T.TimestampType(), False),
    T.StructField('value', T.DoubleType(), True),
    T.StructField('value_source', T.StringType(), False),
    T.StructField('annotations', T.StringType(), True)
])

df_raw_time_series = spark.read.format('json') \
    .schema(data_schema) \
    .load('project_data/project/raw_time_series/json/part-00000-3992c3d5-97c8-4e7d-8328-b7c8379135e8-c000.json')
df_raw_time_series.show()
df_raw_time_series.printSchema()

+--------------------+-------------------+-------------------+------------+--------------------+
|         contract_id|          timestamp|              value|value_source|         annotations|
+--------------------+-------------------+-------------------+------------+--------------------+
|  04 _02 _111 _CHR12|2023-01-01 03:15:00|0.01887007418980177| measurement|{"region":"Europe...|
|  04_02 _ 111 _CHR12|2023-01-01 16:15:00|           1.266225| measurement|{"region":"Europe...|
|  04_02_ 111 _CHR12 |2023-01-01 23:45:27|               NULL| measurement|{"region":"Europe...|
|  04 _02 _111 _CHR12|2023-01-02 03:30:00|0.02093419915470171| measurement|{"region":"Europe...|
|   04 _02_111_CHR12 |2023-01-02 09:45:00| 0.0142526590154003| measurement|{"region":"Europe...|
|  04_02 _ 111 _CHR12|2023-01-02 18:45:00|0.07874463371047623| measurement|{"region":"Europe...|
|   04 _02_111_CHR12 |2023-01-03 00:45:00| 0.1950477376343327| measurement|{"region":"Europe...|
|  04_02_ 111 _CHR12 |2023-01-

Data clean-up progress - if ruled more than once, all values will be turned to NULL

In [4]:
df_raw_time_series = (
    df_raw_time_series
    .withColumn('contract_id', f.regexp_replace(f.col('contract_id'), ' ', ''))
    .withColumn('value_source', f.when(f.col('value').isNull(), 'missing').otherwise(f.col('value_source')))
    .withColumn('timestamp',
                f.from_unixtime(f.round(f.unix_timestamp('timestamp') / (15 * 60)) * (15 * 60)))
    .withColumn('timestamp',f.date_format('timestamp', 'yyyy-MM-dd HH:mm'))
)
df_raw_time_series.show()

+---------------+----------------+-------------------+------------+--------------------+
|    contract_id|       timestamp|              value|value_source|         annotations|
+---------------+----------------+-------------------+------------+--------------------+
|04_02_111_CHR12|2023-01-01 03:15|0.01887007418980177| measurement|{"region":"Europe...|
|04_02_111_CHR12|2023-01-01 16:15|           1.266225| measurement|{"region":"Europe...|
|04_02_111_CHR12|2023-01-01 23:45|               NULL|     missing|{"region":"Europe...|
|04_02_111_CHR12|2023-01-02 03:30|0.02093419915470171| measurement|{"region":"Europe...|
|04_02_111_CHR12|2023-01-02 09:45| 0.0142526590154003| measurement|{"region":"Europe...|
|04_02_111_CHR12|2023-01-02 18:45|0.07874463371047623| measurement|{"region":"Europe...|
|04_02_111_CHR12|2023-01-03 00:45| 0.1950477376343327| measurement|{"region":"Europe...|
|04_02_111_CHR12|2023-01-03 01:00| 0.1494432740331963| measurement|{"region":"Europe...|
|04_02_111_CHR12|2023

Extracting a new "region" column from "annotations"

In [5]:
json_schema = T.StructType([
    T.StructField('region', T.StringType(), False)
])

df_raw_time_series = (
    df_raw_time_series
    .withColumn('region', f.from_json(f.col('annotations'), json_schema).getField('region'))
)
df_raw_time_series.show()

# check-up to see other regions
df_raw_time_series.select('region').distinct().show()

+---------------+----------------+-------------------+------------+--------------------+-------------+
|    contract_id|       timestamp|              value|value_source|         annotations|       region|
+---------------+----------------+-------------------+------------+--------------------+-------------+
|04_02_111_CHR12|2023-01-01 03:15|0.01887007418980177| measurement|{"region":"Europe...|Europe/Berlin|
|04_02_111_CHR12|2023-01-01 16:15|           1.266225| measurement|{"region":"Europe...|Europe/Berlin|
|04_02_111_CHR12|2023-01-01 23:45|               NULL|     missing|{"region":"Europe...|Europe/Berlin|
|04_02_111_CHR12|2023-01-02 03:30|0.02093419915470171| measurement|{"region":"Europe...|Europe/Berlin|
|04_02_111_CHR12|2023-01-02 09:45| 0.0142526590154003| measurement|{"region":"Europe...|Europe/Berlin|
|04_02_111_CHR12|2023-01-02 18:45|0.07874463371047623| measurement|{"region":"Europe...|Europe/Berlin|
|04_02_111_CHR12|2023-01-03 00:45| 0.1950477376343327| measurement|{"regi

Customers with invalid regions will be removed from the database and saved on disk in a separate location.
- Suppose the regions respect the following format: `continent/city`

In [None]:
df_invalid_region = (
    df_raw_time_series
    .filter((f.col('region').isNull()) |
            (f.trim(f.col('region')) == '') |
            (f.regexp_extract(f.col('region'), r'^[A-Za-z]+/[A-Za-z]+$', 0) == ''))
)

df_invalid_region.show()
# df_invalid_region.write.json('invalid_regions', mode="overwrite") - doesn't work locally, will update to new environment soon

+---------------+----------------+--------------------+------------+--------------------+--------+
|    contract_id|       timestamp|               value|value_source|         annotations|  region|
+---------------+----------------+--------------------+------------+--------------------+--------+
|01_02_155_CHR98|2023-01-01 05:00|  0.1407119333744049| measurement|{"region":"WakaWa...|WakaWaka|
|01_02_155_CHR98|2023-01-01 07:00| 0.04566259682178497| measurement|{"region":"WakaWa...|WakaWaka|
|01_02_155_CHR98|2023-01-01 07:30| 0.10312729328870773| measurement|{"region":"WakaWa...|WakaWaka|
|01_02_155_CHR98|2023-01-01 08:30| 0.06968000531196594| measurement|{"region":"WakaWa...|WakaWaka|
|01_02_155_CHR98|2023-01-01 09:30| 0.11096686869859695| measurement|{"region":"WakaWa...|WakaWaka|
|01_02_155_CHR98|2023-01-01 09:45|   0.241357684135437| measurement|{"region":"WakaWa...|WakaWaka|
|01_02_155_CHR98|2023-01-01 12:30| 0.07323921471834183| measurement|{"region":"WakaWa...|WakaWaka|
|01_02_155

Py4JJavaError: An error occurred while calling o81.json.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:774)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)


Extract date from the `timestamp` column in a new `utc_date` column

In [7]:
df_raw_time_series = (
    df_raw_time_series
    .withColumn('utc_date',f.date_format('timestamp', 'yyyy-MM-dd'))
)
df_raw_time_series.show()

+---------------+----------------+-------------------+------------+--------------------+-------------+----------+
|    contract_id|       timestamp|              value|value_source|         annotations|       region|  utc_date|
+---------------+----------------+-------------------+------------+--------------------+-------------+----------+
|04_02_111_CHR12|2023-01-01 03:15|0.01887007418980177| measurement|{"region":"Europe...|Europe/Berlin|2023-01-01|
|04_02_111_CHR12|2023-01-01 16:15|           1.266225| measurement|{"region":"Europe...|Europe/Berlin|2023-01-01|
|04_02_111_CHR12|2023-01-01 23:45|               NULL|     missing|{"region":"Europe...|Europe/Berlin|2023-01-01|
|04_02_111_CHR12|2023-01-02 03:30|0.02093419915470171| measurement|{"region":"Europe...|Europe/Berlin|2023-01-02|
|04_02_111_CHR12|2023-01-02 09:45| 0.0142526590154003| measurement|{"region":"Europe...|Europe/Berlin|2023-01-02|
|04_02_111_CHR12|2023-01-02 18:45|0.07874463371047623| measurement|{"region":"Europe...|

Calculate local date for the date and time of `timestamp`, based on the region, in a new `local_timestamp` column

In [8]:
df_raw_time_series = df_raw_time_series.withColumn(
    "local_timestamp",
    f.from_utc_timestamp(df_raw_time_series["timestamp"], df_raw_time_series["region"])
)

df_raw_time_series.show()

+---------------+----------------+-------------------+------------+--------------------+-------------+----------+-------------------+
|    contract_id|       timestamp|              value|value_source|         annotations|       region|  utc_date|    local_timestamp|
+---------------+----------------+-------------------+------------+--------------------+-------------+----------+-------------------+
|04_02_111_CHR12|2023-01-01 03:15|0.01887007418980177| measurement|{"region":"Europe...|Europe/Berlin|2023-01-01|2023-01-01 04:15:00|
|04_02_111_CHR12|2023-01-01 16:15|           1.266225| measurement|{"region":"Europe...|Europe/Berlin|2023-01-01|2023-01-01 17:15:00|
|04_02_111_CHR12|2023-01-01 23:45|               NULL|     missing|{"region":"Europe...|Europe/Berlin|2023-01-01|2023-01-02 00:45:00|
|04_02_111_CHR12|2023-01-02 03:30|0.02093419915470171| measurement|{"region":"Europe...|Europe/Berlin|2023-01-02|2023-01-02 04:30:00|
|04_02_111_CHR12|2023-01-02 09:45| 0.0142526590154003| measure

Extract date from the `local_timestamp` column in a new `local_date` column

In [9]:
df_raw_time_series = df_raw_time_series.withColumn(
    "local_date",
    f.to_date(df_raw_time_series["local_timestamp"])
)

df_raw_time_series.show()

+---------------+----------------+-------------------+------------+--------------------+-------------+----------+-------------------+----------+
|    contract_id|       timestamp|              value|value_source|         annotations|       region|  utc_date|    local_timestamp|local_date|
+---------------+----------------+-------------------+------------+--------------------+-------------+----------+-------------------+----------+
|04_02_111_CHR12|2023-01-01 03:15|0.01887007418980177| measurement|{"region":"Europe...|Europe/Berlin|2023-01-01|2023-01-01 04:15:00|2023-01-01|
|04_02_111_CHR12|2023-01-01 16:15|           1.266225| measurement|{"region":"Europe...|Europe/Berlin|2023-01-01|2023-01-01 17:15:00|2023-01-01|
|04_02_111_CHR12|2023-01-01 23:45|               NULL|     missing|{"region":"Europe...|Europe/Berlin|2023-01-01|2023-01-02 00:45:00|2023-01-02|
|04_02_111_CHR12|2023-01-02 03:30|0.02093419915470171| measurement|{"region":"Europe...|Europe/Berlin|2023-01-02|2023-01-02 04:30: