In [1]:
from etl.spark_singleton import SparkSingleton
from pyspark.sql.functions import col, lit
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, BooleanType, IntegerType
import seaborn as sns
import matplotlib.pyplot as plt
from collections.abc import Iterable

spark = SparkSingleton.get_spark()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/29 11:08:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/29 11:08:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Import data

In [2]:
DETECTION_PATH = "../../../www-data/JoinedQPP"
SOURCE_PATH = "../../../www-data/vvvSrc5"
VARIABILITY_PATH = "../../../www-data/vvvVar"

#### Detection

In [3]:
detection = spark.read.parquet(DETECTION_PATH)
detection.count()

43016907

Validate that this only contains modulo 3:

In [4]:
detection.select(col("sourceID") % 2096 == 3).count() == detection.count()

True

#### Source

In [5]:
source = spark.read.parquet(SOURCE_PATH)
source.count()

264840

In [6]:
source.select(col("sourceID") % 2096 == 3).count() == source.count()

True

#### Variability (for validation)

In [7]:
var = spark.read.parquet(VARIABILITY_PATH)

In [8]:
var.select(col("sourceID") % 2096 == 3).count() == var.count()

True

### Validate that all sources occur in detection and all detections match a source

In [9]:
source_source_ids = source.select("sourceID").distinct().rdd.flatMap(lambda x: x).collect()
detection_source_ids = detection.select("sourceID").distinct().rdd.flatMap(lambda x: x).collect()

                                                                                

Number of source IDs that have no detections:

In [10]:
len(list(set(source_source_ids) - set(detection_source_ids)))

34271

Number of detections where the source is not in the Source table:

In [11]:
len(list(set(detection_source_ids) - set(source_source_ids)))

0

### Subset data

In [12]:
source_subset = source.sample(fraction=0.001)
source_subset.count()

313

In [13]:
source_ids = source_subset.select("sourceID").distinct().rdd.flatMap(lambda x: x).collect()
var_subset = var.filter(var["sourceID"].isin(source_ids))
detection_subset = detection.filter(detection["sourceID"].isin(source_ids))

In [14]:
var_subset.count()

269

### Load

In [17]:
source_subset.write.parquet("../example_data/source", mode="overwrite")

                                                                                

In [18]:
var_subset.write.parquet("../example_data/variability", mode="overwrite")

In [19]:
detection_subset.write.parquet("../example_data/detection", mode="overwrite")

                                                                                