# Install python dependencies

In [1]:
!python -V

# Set up dependencies
!pip install pyjson5
!pip install pyjson

# dynamically load changes to code
%load_ext autoreload
%autoreload 2

base_path = '/content/experiment_module'

# update the path so the custom module can be loaded
import sys
sys.path.insert(1, base_path)

Python 3.10.12
Collecting pyjson5
  Downloading pyjson5-1.6.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (357 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m357.7/357.7 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyjson5
Successfully installed pyjson5-1.6.6
Collecting pyjson
  Downloading pyjson-1.4.1-py3-none-any.whl (4.6 kB)
Installing collected packages: pyjson
Successfully installed pyjson-1.4.1


In [2]:

# Set up custom module
# see https://saturncloud.io/blog/how-to-import-custom-modules-in-google-colab/
!git clone https://github.com/prule/data-processing-experiment-python.git experiment_module


Cloning into 'experiment_module'...
remote: Enumerating objects: 106, done.[K
remote: Counting objects: 100% (106/106), done.[K
remote: Compressing objects: 100% (66/66), done.[K
remote: Total 106 (delta 29), reused 99 (delta 22), pack-reused 0[K
Receiving objects: 100% (106/106), 17.23 KiB | 4.31 MiB/s, done.
Resolving deltas: 100% (29/29), done.


## Pull latest code if required

In [3]:
!cd experiment_module && git pull

Already up to date.



# Initialize spark

In [4]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession \
       .builder \
       .appName("data processing experiment") \
       .getOrCreate()


[33m0% [Working][0m            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
[33m0% [Connecting to archive.ubuntu.com] [1 InRelease 11.3 kB/110 kB 10%] [Waiting[0m                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [808 kB]
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Get:8 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [1,694 kB]
Get:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Hit:10 https://ppa.launchpadcontent.net/graphics-d

### Load table configuration
Load the JSON5 table configuration so we can work with it.

In [5]:
from experiment_module.src.core_prule.JsonRepository import JsonRepository
from experiment_module.src.core_prule.Configuration import Sources
import json

json_repo = JsonRepository()
table_config = json_repo.load_file('/content/experiment_module/config/sample1/sample1.tables.json5')

print(json_repo.print(table_config))

sources = Sources.from_dict(table_config)

{
  "id": "sample1",
  "name": "Sample 1",
  "description": "Sample 1 is a basic dataset configuration to demonstrate capability",
  "sources": [
    {
      "id": "transactions",
      "name": "Transactions",
      "description": "Transactions contains transactions from multiple bank accounts",
      "path": "sample1/transactions/",
      "type": "csv",
      "table": {
        "name": "transactions",
        "description": "account transactions",
        "deduplicate": true,
        "trim": true,
        "columns": [
          {
            "names": [
              "date"
            ],
            "alias": "date",
            "description": "date of transaction",
            "type": {
              "type": "com.example.dataprocessingexperiment.spark.data.types.DateType",
              "formats": [
                "yyyy-MM-dd",
                "dd-MM-yyyy"
              ]
            },
            "required": true
          },
          {
            "names": [
              "accoun

# Example use
This code replicates the reference app and shows how we can use the code from the custom module to load data from configuration.


In [6]:
from src.core_prule.Context import Context
from src.core_prule.DataFrameBuilder import DataFrameBuilder


class App:

    def go(self):
        sources = Sources.from_dict(JsonRepository().load_file(base_path + '/config/sample1/sample1.tables.json5'))

        with (SparkSession.builder.appName("Data Processing Experiment").master("local").getOrCreate()) as spark:
            context = Context(sources)

            for source in sources.sources:
                builder = DataFrameBuilder(source, base_path + "/data/", spark)

                # ------------
                # RAW
                # ------------

                # get the raw version of the dataset, everything is a string, and all columns are included
                raw = builder.raw()
                self.display("raw", raw)

                # ------------
                # SELECTED
                # ------------
                #
                # Get the selected version of the dataset, everything is a string,
                # and only configured columns are included.
                # Values will be trimmed if specified, and columns will be aliased.
                selected = builder.selected()
                self.display("selected", selected)

                # ------------
                # TYPED
                # ------------
                #
                # get the typed version of the dataset, with columns and types specified in config
                typed = builder.typed()
                self.display("typed", typed)

                # Add to context
                context.put(source.key, typed)

    def display(self, name: str, df: DataFrame):
        print()
        print(name)
        print()

        df.printSchema()
        df.sort(df.columns[0]).show(100, 10)
        print(f"Row count = {df.count()}")


app = App()
app.go()



raw

root
 |-- date: string (nullable = true)
 |-- account: string (nullable = true)
 |-- description: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- location: string (nullable = true)
 |-- comment: string (nullable = true)

+----------+-------+-----------+-------+----------+----------+
|      date|account|description| amount|  location|   comment|
+----------+-------+-----------+-------+----------+----------+
|      NULL|      x|     tennis|   0.03|      NULL|   no date|
| 2020-0...|      1|     tennis|  35.00|  Banana  |      NULL|
| 2020-0...|      2|     petrol| 150.45|Central...|      NULL|
|01-03-2020|      1|     burger|  15.47|Greater...|alterna...|
|03-03-2020|      1|     tennis|  35.03| Maroondah|alterna...|
|04-03-2020|      2|     petrol| 150.47|      NULL|alterna...|
|05-03-2020|      2|     petrol|  50.47|   Burwood|      NULL|
|05-03-2020|      2|     petrol|  50.48|  Berrigan|      NULL|
|2020-01...|      1|  burgers  |  15.45|  Gympie  |      NUL